diff --git a/controllers/vspherecluster_controller.go b/controllers/vspherecluster_controller.go index 29bfe4c2d6..cb2abe7938 100644 --- a/controllers/vspherecluster_controller.go +++ b/controllers/vspherecluster_controller.go @@ -403,19 +403,29 @@ func (r clusterReconciler) reconcileIdentitySecret(ctx *context.ClusterContext) } func (r clusterReconciler) reconcileVCenterConnectivity(ctx *context.ClusterContext) error { + params := session.NewParams(). + WithServer(ctx.VSphereCluster.Spec.Server). + WithDatacenter(ctx.VSphereCluster.Spec.CloudProviderConfiguration.Workspace.Datacenter). + WithThumbprint(ctx.VSphereCluster.Spec.Thumbprint). + WithFeatures(session.Feature{ + EnableKeepAlive: r.EnableKeepAlive, + KeepAliveDuration: r.KeepAliveDuration, + }) + if ctx.VSphereCluster.Spec.IdentityRef != nil { creds, err := identity.GetCredentials(ctx, r.Client, ctx.VSphereCluster, r.Namespace) if err != nil { return err } - _, err = session.GetOrCreate(ctx, ctx.VSphereCluster.Spec.Server, - ctx.VSphereCluster.Spec.CloudProviderConfiguration.Workspace.Datacenter, creds.Username, creds.Password, ctx.VSphereCluster.Spec.Thumbprint) + params = params.WithUserInfo(creds.Username, creds.Password) + _, err = session.GetOrCreate(ctx, params) return err } - _, err := session.GetOrCreate(ctx, ctx.VSphereCluster.Spec.Server, - ctx.VSphereCluster.Spec.CloudProviderConfiguration.Workspace.Datacenter, ctx.Username, ctx.Password, ctx.VSphereCluster.Spec.Thumbprint) + params = params.WithUserInfo(ctx.Username, ctx.Password) + _, err := session.GetOrCreate(ctx, + params) return err } diff --git a/controllers/vspherevm_controller.go b/controllers/vspherevm_controller.go index 95a54bde19..13ee506551 100644 --- a/controllers/vspherevm_controller.go +++ b/controllers/vspherevm_controller.go @@ -415,11 +415,21 @@ func (r *vmReconciler) clusterToVSphereVMs(a ctrlclient.Object) []reconcile.Requ func (r *vmReconciler) retrieveVcenterSession(ctx goctx.Context, vsphereVM *infrav1.VSphereVM) (*session.Session, error) { // Get cluster object and then get VSphereCluster object + + params := session.NewParams(). + WithServer(vsphereVM.Spec.Server). + WithDatacenter(vsphereVM.Spec.Datacenter). + WithUserInfo(r.ControllerContext.Username, r.ControllerContext.Password). + WithThumbprint(vsphereVM.Spec.Thumbprint). + WithFeatures(session.Feature{ + EnableKeepAlive: r.EnableKeepAlive, + KeepAliveDuration: r.KeepAliveDuration, + }) cluster, err := clusterutilv1.GetClusterFromMetadata(r.ControllerContext, r.Client, vsphereVM.ObjectMeta) if err != nil { r.Logger.Info("VsphereVM is missing cluster label or cluster does not exist") - return session.GetOrCreate(r.Context, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, - r.ControllerManagerContext.Username, r.ControllerManagerContext.Password, vsphereVM.Spec.Thumbprint) + return session.GetOrCreate(r.Context, + params) } key := client.ObjectKey{ @@ -430,8 +440,8 @@ func (r *vmReconciler) retrieveVcenterSession(ctx goctx.Context, vsphereVM *infr err = r.Client.Get(r, key, vsphereCluster) if err != nil { r.Logger.Info("VSphereCluster couldn't be retrieved") - return session.GetOrCreate(r.Context, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, - r.ControllerManagerContext.Username, r.ControllerManagerContext.Password, vsphereVM.Spec.Thumbprint) + return session.GetOrCreate(r.Context, + params) } if vsphereCluster.Spec.IdentityRef != nil { @@ -439,10 +449,12 @@ func (r *vmReconciler) retrieveVcenterSession(ctx goctx.Context, vsphereVM *infr if err != nil { return nil, errors.Wrap(err, "failed to retrieve credentials from IdentityRef") } - return session.GetOrCreate(ctx, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, creds.Username, creds.Password, vsphereVM.Spec.Thumbprint) + params = params.WithUserInfo(creds.Username, creds.Password) + return session.GetOrCreate(r.Context, + params) } // Fallback to using credentials provided to the manager - return session.GetOrCreate(r.Context, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, - r.ControllerManagerContext.Username, r.ControllerManagerContext.Password, vsphereVM.Spec.Thumbprint) + return session.GetOrCreate(r.Context, + params) } diff --git a/main.go b/main.go index bd632488db..41b0f09ce9 100644 --- a/main.go +++ b/main.go @@ -24,8 +24,6 @@ import ( "os" "time" - "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha4" - "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -33,7 +31,9 @@ import ( ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha4" "sigs.k8s.io/cluster-api-provider-vsphere/controllers" + "sigs.k8s.io/cluster-api-provider-vsphere/pkg/constants" "sigs.k8s.io/cluster-api-provider-vsphere/pkg/context" "sigs.k8s.io/cluster-api-provider-vsphere/pkg/manager" "sigs.k8s.io/cluster-api-provider-vsphere/pkg/version" @@ -42,12 +42,14 @@ import ( var ( setupLog = ctrllog.Log.WithName("entrypoint") - managerOpts manager.Options - defaultProfilerAddr = os.Getenv("PROFILER_ADDR") - defaultSyncPeriod = manager.DefaultSyncPeriod - defaultLeaderElectionID = manager.DefaultLeaderElectionID - defaultPodName = manager.DefaultPodName - defaultWebhookPort = manager.DefaultWebhookServiceContainerPort + managerOpts manager.Options + defaultProfilerAddr = os.Getenv("PROFILER_ADDR") + defaultSyncPeriod = manager.DefaultSyncPeriod + defaultLeaderElectionID = manager.DefaultLeaderElectionID + defaultPodName = manager.DefaultPodName + defaultWebhookPort = manager.DefaultWebhookServiceContainerPort + defaultEnableKeepAlive = constants.DefaultEnableKeepAlive + defaultKeepAliveDuration = constants.DefaultKeepAliveDuration ) // nolint:gocognit @@ -116,6 +118,15 @@ func main() { "/etc/capv/credentials.yaml", "path to CAPV's credentials file", ) + flag.BoolVar(&managerOpts.EnableKeepAlive, + "enable-keep-alive", + defaultEnableKeepAlive, + "feature to enable keep alive handler in vsphere sessions") + + flag.DurationVar(&managerOpts.KeepAliveDuration, + "keep-alive-duration", + defaultKeepAliveDuration, + "idle time interval(minutes) in between send() requests in keepalive handler") flag.Parse() diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index d5016d1495..1bd3014f91 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -17,6 +17,8 @@ limitations under the License. package constants import ( + "time" + "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha3" ) @@ -48,4 +50,10 @@ const ( // MaintenanceAnnotationLabel is the annotation used to indicate a machine and/or // cluster are in maintenance mode. MaintenanceAnnotationLabel = "capv." + v1alpha3.GroupName + "/maintenance" + + // DefaultEnableKeepAlive is false by default + DefaultEnableKeepAlive = false + + // KeepaliveDuration unit minutes + DefaultKeepAliveDuration = time.Minute * 5 ) diff --git a/pkg/context/controller_manager_context.go b/pkg/context/controller_manager_context.go index 11d2e3974d..36dcde494c 100644 --- a/pkg/context/controller_manager_context.go +++ b/pkg/context/controller_manager_context.go @@ -19,6 +19,7 @@ package context import ( "context" "sync" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" @@ -77,6 +78,14 @@ type ControllerManagerContext struct { // endpoints. Password string + // EnableKeepAlive is a session feature to enable keep alive handler + // for better load management on vSphere api server + EnableKeepAlive bool + + // KeepAliveDuration is the idle time interval in between send() requests + // in keepalive handler + KeepAliveDuration time.Duration + genericEventCache sync.Map } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5c1bfeeca4..b50860e42c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -92,6 +92,8 @@ func New(opts Options) (Manager, error) { Scheme: opts.Scheme, Username: opts.Username, Password: opts.Password, + EnableKeepAlive: opts.EnableKeepAlive, + KeepAliveDuration: opts.KeepAliveDuration, } // Add the requested items to the manager. diff --git a/pkg/manager/options.go b/pkg/manager/options.go index a8b1f1efd0..d62c826323 100644 --- a/pkg/manager/options.go +++ b/pkg/manager/options.go @@ -47,6 +47,10 @@ type Options struct { // LeaderElectionEnabled is a flag that enables leader election. LeaderElectionEnabled bool + // EnableKeepAlive is a session feature to enable keep alive handler + // for better load management on vSphere api server + EnableKeepAlive bool + // LeaderElectionID is the name of the config map to use as the // locking resource when configuring leader election. LeaderElectionID string @@ -98,6 +102,10 @@ type Options struct { // endpoints. Password string + // KeepAliveDuration is the idle time interval in between send() requests + // in keepalive handler + KeepAliveDuration time.Duration + // WebhookPort is the port that the webhook server serves at. WebhookPort int diff --git a/pkg/services/govmomi/create_test.go b/pkg/services/govmomi/create_test.go index bdf69b9dd6..0ca5b4cee8 100644 --- a/pkg/services/govmomi/create_test.go +++ b/pkg/services/govmomi/create_test.go @@ -47,9 +47,10 @@ func TestCreate(t *testing.T) { vmContext.VSphereVM.Spec.Server = s.URL.Host authSession, err := session.GetOrCreate( - vmContext, - vmContext.VSphereVM.Spec.Server, "", - s.URL.User.Username(), pass, "") + vmContext.Context, + session.NewParams(). + WithServer(vmContext.VSphereVM.Spec.Server). + WithUserInfo(s.URL.User.Username(), pass)) if err != nil { t.Fatal(err) } diff --git a/pkg/services/govmomi/vcenter/clone_test.go b/pkg/services/govmomi/vcenter/clone_test.go index 020d166905..e9a8aba5c9 100644 --- a/pkg/services/govmomi/vcenter/clone_test.go +++ b/pkg/services/govmomi/vcenter/clone_test.go @@ -143,8 +143,9 @@ func initSimulator(t *testing.T) (*simulator.Model, *session.Session, *simulator authSession, err := session.GetOrCreate( ctx.TODO(), - server.URL.Host, "", - server.URL.User.Username(), pass, "") + session.NewParams(). + WithServer(server.URL.Host). + WithUserInfo(server.URL.User.Username(), pass)) if err != nil { t.Fatal(err) } diff --git a/pkg/session/session.go b/pkg/session/session.go index 74212ab9ed..8899eef196 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -20,17 +20,21 @@ import ( "context" "net/url" "sync" + "time" + "github.com/go-logr/logr" "github.com/pkg/errors" "github.com/vmware/govmomi" "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/session" "github.com/vmware/govmomi/vim25" + "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/soap" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha4" + "sigs.k8s.io/cluster-api-provider-vsphere/pkg/constants" ) var sessionCache = map[string]Session{} @@ -43,33 +47,86 @@ type Session struct { datacenter *object.Datacenter } +type Feature struct { + EnableKeepAlive bool + KeepAliveDuration time.Duration +} + +func DefaultFeature() Feature { + return Feature{ + EnableKeepAlive: constants.DefaultEnableKeepAlive, + } +} + +type Params struct { + server string + datacenter string + userinfo *url.Userinfo + thumbprint string + feature Feature +} + +func NewParams() *Params { + return &Params{ + feature: DefaultFeature(), + } +} + +func (p *Params) WithServer(server string) *Params { + p.server = server + return p +} + +func (p *Params) WithDatacenter(datacenter string) *Params { + p.datacenter = datacenter + return p +} + +func (p *Params) WithUserInfo(username, password string) *Params { + p.userinfo = url.UserPassword(username, password) + return p +} + +func (p *Params) WithThumbprint(thumbprint string) *Params { + p.thumbprint = thumbprint + return p +} + +func (p *Params) WithFeatures(feature Feature) *Params { + p.feature = feature + return p +} + // GetOrCreate gets a cached session or creates a new one if one does not // already exist. -func GetOrCreate( - ctx context.Context, - server, datacenter, username, password string, thumbprint string) (*Session, error) { - logger := ctrl.LoggerFrom(ctx) +func GetOrCreate(ctx context.Context, params *Params) (*Session, error) { + logger := ctrl.LoggerFrom(ctx).WithName("session") sessionMU.Lock() defer sessionMU.Unlock() - sessionKey := server + username + datacenter - if cachedSession, ok := sessionCache[sessionKey]; ok { - if ok, _ := cachedSession.SessionManager.SessionIsActive(ctx); ok { - logger.V(2).Info("found active cached vSphere client session", "server", server, "datacenter", datacenter) - return &cachedSession, nil + sessionKey := params.server + params.userinfo.Username() + params.datacenter + if session, ok := sessionCache[sessionKey]; ok { + // if keepalive is enabled we depend upon roundtripper to reestablish the connection + // and remove the key if it could not + if params.feature.EnableKeepAlive { + return &session, nil + } + if ok, _ := session.SessionManager.SessionIsActive(ctx); ok { + logger.V(2).Info("found active cached vSphere client session", "server", params.server, "datacenter", params.datacenter) + return &session, nil } } - soapURL, err := soap.ParseURL(server) + soapURL, err := soap.ParseURL(params.server) if err != nil { - return nil, errors.Wrapf(err, "error parsing vSphere URL %q", server) + return nil, errors.Wrapf(err, "error parsing vSphere URL %q", params.server) } if soapURL == nil { - return nil, errors.Errorf("error parsing vSphere URL %q", server) + return nil, errors.Errorf("error parsing vSphere URL %q", params.server) } - soapURL.User = url.UserPassword(username, password) - client, err := newClient(ctx, soapURL, thumbprint) + soapURL.User = params.userinfo + client, err := newClient(ctx, logger, sessionKey, soapURL, params.thumbprint, params.feature) if err != nil { return nil, err } @@ -81,9 +138,9 @@ func GetOrCreate( session.Finder = find.NewFinder(session.Client.Client, false) // Assign the datacenter if one was specified. - dc, err := session.Finder.DatacenterOrDefault(ctx, datacenter) + dc, err := session.Finder.DatacenterOrDefault(ctx, params.datacenter) if err != nil { - return nil, errors.Wrapf(err, "unable to find datacenter %q", datacenter) + return nil, errors.Wrapf(err, "unable to find datacenter %q", params.datacenter) } session.datacenter = dc session.Finder.SetDatacenter(dc) @@ -91,12 +148,12 @@ func GetOrCreate( // Cache the session. sessionCache[sessionKey] = session - logger.V(2).Info("cached vSphere client session", "server", server, "datacenter", datacenter) + logger.V(2).Info("cached vSphere client session", "server", params.server, "datacenter", params.datacenter) return &session, nil } -func newClient(ctx context.Context, url *url.URL, thumprint string) (*govmomi.Client, error) { +func newClient(ctx context.Context, logger logr.Logger, sessionKey string, url *url.URL, thumprint string, feature Feature) (*govmomi.Client, error) { insecure := thumprint == "" soapClient := soap.NewClient(url, insecure) if !insecure { @@ -107,10 +164,29 @@ func newClient(ctx context.Context, url *url.URL, thumprint string) (*govmomi.Cl if err != nil { return nil, err } + c := &govmomi.Client{ Client: vimClient, SessionManager: session.NewManager(vimClient), } + + if feature.EnableKeepAlive { + vimClient.RoundTripper = session.KeepAliveHandler(vimClient.RoundTripper, feature.KeepAliveDuration, func(tripper soap.RoundTripper) error { + // we tried implementing + // c.Login here but the client once logged out + // keeps errong in invalid username or password + // we tried with cached username and password in session still the error persisted + // hence we just clear the cache and expect the client to + // be recreated in next GetOrCreate call + _, err := methods.GetCurrentTime(ctx, tripper) + if err != nil { + logger.Error(err, "failed to keep alive govmomi client") + clearCache(sessionKey) + } + return err + }) + } + if err := c.Login(ctx, url.User); err != nil { return nil, err } @@ -118,6 +194,12 @@ func newClient(ctx context.Context, url *url.URL, thumprint string) (*govmomi.Cl return c, nil } +func clearCache(sessionKey string) { + sessionMU.Lock() + defer sessionMU.Unlock() + delete(sessionCache, sessionKey) +} + // FindByBIOSUUID finds an object by its BIOS UUID. // // To avoid comments about this function's name, please see the Golang