From caf0d81d82123e97bd06885d774e138a4be56c9b Mon Sep 17 00:00:00 2001 From: Deepak Sharma Date: Thu, 25 Mar 2021 20:17:33 +0530 Subject: [PATCH] session with round tripper keep alive add command line args for keep alive feature go fmt change deafults update keepalivehandler use session from func keepalive default to false specify unit duration unit changes fix go-lint refactor params gofmt constant refactor to pkg/constant gofmt use ctrl logger code comment changes fix import order make rebase fix --- controllers/vspherecluster_controller.go | 18 +++- controllers/vspherevm_controller.go | 26 +++-- main.go | 27 +++-- pkg/constants/constants.go | 8 ++ pkg/context/controller_manager_context.go | 9 ++ pkg/manager/manager.go | 2 + pkg/manager/options.go | 8 ++ pkg/services/govmomi/create_test.go | 7 +- pkg/services/govmomi/vcenter/clone_test.go | 5 +- pkg/session/session.go | 118 +++++++++++++++++---- 10 files changed, 186 insertions(+), 42 deletions(-) diff --git a/controllers/vspherecluster_controller.go b/controllers/vspherecluster_controller.go index 29bfe4c2d6..cb2abe7938 100644 --- a/controllers/vspherecluster_controller.go +++ b/controllers/vspherecluster_controller.go @@ -403,19 +403,29 @@ func (r clusterReconciler) reconcileIdentitySecret(ctx *context.ClusterContext) } func (r clusterReconciler) reconcileVCenterConnectivity(ctx *context.ClusterContext) error { + params := session.NewParams(). + WithServer(ctx.VSphereCluster.Spec.Server). + WithDatacenter(ctx.VSphereCluster.Spec.CloudProviderConfiguration.Workspace.Datacenter). + WithThumbprint(ctx.VSphereCluster.Spec.Thumbprint). + WithFeatures(session.Feature{ + EnableKeepAlive: r.EnableKeepAlive, + KeepAliveDuration: r.KeepAliveDuration, + }) + if ctx.VSphereCluster.Spec.IdentityRef != nil { creds, err := identity.GetCredentials(ctx, r.Client, ctx.VSphereCluster, r.Namespace) if err != nil { return err } - _, err = session.GetOrCreate(ctx, ctx.VSphereCluster.Spec.Server, - ctx.VSphereCluster.Spec.CloudProviderConfiguration.Workspace.Datacenter, creds.Username, creds.Password, ctx.VSphereCluster.Spec.Thumbprint) + params = params.WithUserInfo(creds.Username, creds.Password) + _, err = session.GetOrCreate(ctx, params) return err } - _, err := session.GetOrCreate(ctx, ctx.VSphereCluster.Spec.Server, - ctx.VSphereCluster.Spec.CloudProviderConfiguration.Workspace.Datacenter, ctx.Username, ctx.Password, ctx.VSphereCluster.Spec.Thumbprint) + params = params.WithUserInfo(ctx.Username, ctx.Password) + _, err := session.GetOrCreate(ctx, + params) return err } diff --git a/controllers/vspherevm_controller.go b/controllers/vspherevm_controller.go index 95a54bde19..13ee506551 100644 --- a/controllers/vspherevm_controller.go +++ b/controllers/vspherevm_controller.go @@ -415,11 +415,21 @@ func (r *vmReconciler) clusterToVSphereVMs(a ctrlclient.Object) []reconcile.Requ func (r *vmReconciler) retrieveVcenterSession(ctx goctx.Context, vsphereVM *infrav1.VSphereVM) (*session.Session, error) { // Get cluster object and then get VSphereCluster object + + params := session.NewParams(). + WithServer(vsphereVM.Spec.Server). + WithDatacenter(vsphereVM.Spec.Datacenter). + WithUserInfo(r.ControllerContext.Username, r.ControllerContext.Password). + WithThumbprint(vsphereVM.Spec.Thumbprint). + WithFeatures(session.Feature{ + EnableKeepAlive: r.EnableKeepAlive, + KeepAliveDuration: r.KeepAliveDuration, + }) cluster, err := clusterutilv1.GetClusterFromMetadata(r.ControllerContext, r.Client, vsphereVM.ObjectMeta) if err != nil { r.Logger.Info("VsphereVM is missing cluster label or cluster does not exist") - return session.GetOrCreate(r.Context, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, - r.ControllerManagerContext.Username, r.ControllerManagerContext.Password, vsphereVM.Spec.Thumbprint) + return session.GetOrCreate(r.Context, + params) } key := client.ObjectKey{ @@ -430,8 +440,8 @@ func (r *vmReconciler) retrieveVcenterSession(ctx goctx.Context, vsphereVM *infr err = r.Client.Get(r, key, vsphereCluster) if err != nil { r.Logger.Info("VSphereCluster couldn't be retrieved") - return session.GetOrCreate(r.Context, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, - r.ControllerManagerContext.Username, r.ControllerManagerContext.Password, vsphereVM.Spec.Thumbprint) + return session.GetOrCreate(r.Context, + params) } if vsphereCluster.Spec.IdentityRef != nil { @@ -439,10 +449,12 @@ func (r *vmReconciler) retrieveVcenterSession(ctx goctx.Context, vsphereVM *infr if err != nil { return nil, errors.Wrap(err, "failed to retrieve credentials from IdentityRef") } - return session.GetOrCreate(ctx, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, creds.Username, creds.Password, vsphereVM.Spec.Thumbprint) + params = params.WithUserInfo(creds.Username, creds.Password) + return session.GetOrCreate(r.Context, + params) } // Fallback to using credentials provided to the manager - return session.GetOrCreate(r.Context, vsphereVM.Spec.Server, vsphereVM.Spec.Datacenter, - r.ControllerManagerContext.Username, r.ControllerManagerContext.Password, vsphereVM.Spec.Thumbprint) + return session.GetOrCreate(r.Context, + params) } diff --git a/main.go b/main.go index bd632488db..41b0f09ce9 100644 --- a/main.go +++ b/main.go @@ -24,8 +24,6 @@ import ( "os" "time" - "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha4" - "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -33,7 +31,9 @@ import ( ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha4" "sigs.k8s.io/cluster-api-provider-vsphere/controllers" + "sigs.k8s.io/cluster-api-provider-vsphere/pkg/constants" "sigs.k8s.io/cluster-api-provider-vsphere/pkg/context" "sigs.k8s.io/cluster-api-provider-vsphere/pkg/manager" "sigs.k8s.io/cluster-api-provider-vsphere/pkg/version" @@ -42,12 +42,14 @@ import ( var ( setupLog = ctrllog.Log.WithName("entrypoint") - managerOpts manager.Options - defaultProfilerAddr = os.Getenv("PROFILER_ADDR") - defaultSyncPeriod = manager.DefaultSyncPeriod - defaultLeaderElectionID = manager.DefaultLeaderElectionID - defaultPodName = manager.DefaultPodName - defaultWebhookPort = manager.DefaultWebhookServiceContainerPort + managerOpts manager.Options + defaultProfilerAddr = os.Getenv("PROFILER_ADDR") + defaultSyncPeriod = manager.DefaultSyncPeriod + defaultLeaderElectionID = manager.DefaultLeaderElectionID + defaultPodName = manager.DefaultPodName + defaultWebhookPort = manager.DefaultWebhookServiceContainerPort + defaultEnableKeepAlive = constants.DefaultEnableKeepAlive + defaultKeepAliveDuration = constants.DefaultKeepAliveDuration ) // nolint:gocognit @@ -116,6 +118,15 @@ func main() { "/etc/capv/credentials.yaml", "path to CAPV's credentials file", ) + flag.BoolVar(&managerOpts.EnableKeepAlive, + "enable-keep-alive", + defaultEnableKeepAlive, + "feature to enable keep alive handler in vsphere sessions") + + flag.DurationVar(&managerOpts.KeepAliveDuration, + "keep-alive-duration", + defaultKeepAliveDuration, + "idle time interval(minutes) in between send() requests in keepalive handler") flag.Parse() diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index d5016d1495..1bd3014f91 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -17,6 +17,8 @@ limitations under the License. package constants import ( + "time" + "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha3" ) @@ -48,4 +50,10 @@ const ( // MaintenanceAnnotationLabel is the annotation used to indicate a machine and/or // cluster are in maintenance mode. MaintenanceAnnotationLabel = "capv." + v1alpha3.GroupName + "/maintenance" + + // DefaultEnableKeepAlive is false by default + DefaultEnableKeepAlive = false + + // KeepaliveDuration unit minutes + DefaultKeepAliveDuration = time.Minute * 5 ) diff --git a/pkg/context/controller_manager_context.go b/pkg/context/controller_manager_context.go index 11d2e3974d..36dcde494c 100644 --- a/pkg/context/controller_manager_context.go +++ b/pkg/context/controller_manager_context.go @@ -19,6 +19,7 @@ package context import ( "context" "sync" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" @@ -77,6 +78,14 @@ type ControllerManagerContext struct { // endpoints. Password string + // EnableKeepAlive is a session feature to enable keep alive handler + // for better load management on vSphere api server + EnableKeepAlive bool + + // KeepAliveDuration is the idle time interval in between send() requests + // in keepalive handler + KeepAliveDuration time.Duration + genericEventCache sync.Map } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 5c1bfeeca4..b50860e42c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -92,6 +92,8 @@ func New(opts Options) (Manager, error) { Scheme: opts.Scheme, Username: opts.Username, Password: opts.Password, + EnableKeepAlive: opts.EnableKeepAlive, + KeepAliveDuration: opts.KeepAliveDuration, } // Add the requested items to the manager. diff --git a/pkg/manager/options.go b/pkg/manager/options.go index a8b1f1efd0..d62c826323 100644 --- a/pkg/manager/options.go +++ b/pkg/manager/options.go @@ -47,6 +47,10 @@ type Options struct { // LeaderElectionEnabled is a flag that enables leader election. LeaderElectionEnabled bool + // EnableKeepAlive is a session feature to enable keep alive handler + // for better load management on vSphere api server + EnableKeepAlive bool + // LeaderElectionID is the name of the config map to use as the // locking resource when configuring leader election. LeaderElectionID string @@ -98,6 +102,10 @@ type Options struct { // endpoints. Password string + // KeepAliveDuration is the idle time interval in between send() requests + // in keepalive handler + KeepAliveDuration time.Duration + // WebhookPort is the port that the webhook server serves at. WebhookPort int diff --git a/pkg/services/govmomi/create_test.go b/pkg/services/govmomi/create_test.go index bdf69b9dd6..0ca5b4cee8 100644 --- a/pkg/services/govmomi/create_test.go +++ b/pkg/services/govmomi/create_test.go @@ -47,9 +47,10 @@ func TestCreate(t *testing.T) { vmContext.VSphereVM.Spec.Server = s.URL.Host authSession, err := session.GetOrCreate( - vmContext, - vmContext.VSphereVM.Spec.Server, "", - s.URL.User.Username(), pass, "") + vmContext.Context, + session.NewParams(). + WithServer(vmContext.VSphereVM.Spec.Server). + WithUserInfo(s.URL.User.Username(), pass)) if err != nil { t.Fatal(err) } diff --git a/pkg/services/govmomi/vcenter/clone_test.go b/pkg/services/govmomi/vcenter/clone_test.go index 020d166905..e9a8aba5c9 100644 --- a/pkg/services/govmomi/vcenter/clone_test.go +++ b/pkg/services/govmomi/vcenter/clone_test.go @@ -143,8 +143,9 @@ func initSimulator(t *testing.T) (*simulator.Model, *session.Session, *simulator authSession, err := session.GetOrCreate( ctx.TODO(), - server.URL.Host, "", - server.URL.User.Username(), pass, "") + session.NewParams(). + WithServer(server.URL.Host). + WithUserInfo(server.URL.User.Username(), pass)) if err != nil { t.Fatal(err) } diff --git a/pkg/session/session.go b/pkg/session/session.go index 74212ab9ed..8899eef196 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -20,17 +20,21 @@ import ( "context" "net/url" "sync" + "time" + "github.com/go-logr/logr" "github.com/pkg/errors" "github.com/vmware/govmomi" "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/session" "github.com/vmware/govmomi/vim25" + "github.com/vmware/govmomi/vim25/methods" "github.com/vmware/govmomi/vim25/soap" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/cluster-api-provider-vsphere/api/v1alpha4" + "sigs.k8s.io/cluster-api-provider-vsphere/pkg/constants" ) var sessionCache = map[string]Session{} @@ -43,33 +47,86 @@ type Session struct { datacenter *object.Datacenter } +type Feature struct { + EnableKeepAlive bool + KeepAliveDuration time.Duration +} + +func DefaultFeature() Feature { + return Feature{ + EnableKeepAlive: constants.DefaultEnableKeepAlive, + } +} + +type Params struct { + server string + datacenter string + userinfo *url.Userinfo + thumbprint string + feature Feature +} + +func NewParams() *Params { + return &Params{ + feature: DefaultFeature(), + } +} + +func (p *Params) WithServer(server string) *Params { + p.server = server + return p +} + +func (p *Params) WithDatacenter(datacenter string) *Params { + p.datacenter = datacenter + return p +} + +func (p *Params) WithUserInfo(username, password string) *Params { + p.userinfo = url.UserPassword(username, password) + return p +} + +func (p *Params) WithThumbprint(thumbprint string) *Params { + p.thumbprint = thumbprint + return p +} + +func (p *Params) WithFeatures(feature Feature) *Params { + p.feature = feature + return p +} + // GetOrCreate gets a cached session or creates a new one if one does not // already exist. -func GetOrCreate( - ctx context.Context, - server, datacenter, username, password string, thumbprint string) (*Session, error) { - logger := ctrl.LoggerFrom(ctx) +func GetOrCreate(ctx context.Context, params *Params) (*Session, error) { + logger := ctrl.LoggerFrom(ctx).WithName("session") sessionMU.Lock() defer sessionMU.Unlock() - sessionKey := server + username + datacenter - if cachedSession, ok := sessionCache[sessionKey]; ok { - if ok, _ := cachedSession.SessionManager.SessionIsActive(ctx); ok { - logger.V(2).Info("found active cached vSphere client session", "server", server, "datacenter", datacenter) - return &cachedSession, nil + sessionKey := params.server + params.userinfo.Username() + params.datacenter + if session, ok := sessionCache[sessionKey]; ok { + // if keepalive is enabled we depend upon roundtripper to reestablish the connection + // and remove the key if it could not + if params.feature.EnableKeepAlive { + return &session, nil + } + if ok, _ := session.SessionManager.SessionIsActive(ctx); ok { + logger.V(2).Info("found active cached vSphere client session", "server", params.server, "datacenter", params.datacenter) + return &session, nil } } - soapURL, err := soap.ParseURL(server) + soapURL, err := soap.ParseURL(params.server) if err != nil { - return nil, errors.Wrapf(err, "error parsing vSphere URL %q", server) + return nil, errors.Wrapf(err, "error parsing vSphere URL %q", params.server) } if soapURL == nil { - return nil, errors.Errorf("error parsing vSphere URL %q", server) + return nil, errors.Errorf("error parsing vSphere URL %q", params.server) } - soapURL.User = url.UserPassword(username, password) - client, err := newClient(ctx, soapURL, thumbprint) + soapURL.User = params.userinfo + client, err := newClient(ctx, logger, sessionKey, soapURL, params.thumbprint, params.feature) if err != nil { return nil, err } @@ -81,9 +138,9 @@ func GetOrCreate( session.Finder = find.NewFinder(session.Client.Client, false) // Assign the datacenter if one was specified. - dc, err := session.Finder.DatacenterOrDefault(ctx, datacenter) + dc, err := session.Finder.DatacenterOrDefault(ctx, params.datacenter) if err != nil { - return nil, errors.Wrapf(err, "unable to find datacenter %q", datacenter) + return nil, errors.Wrapf(err, "unable to find datacenter %q", params.datacenter) } session.datacenter = dc session.Finder.SetDatacenter(dc) @@ -91,12 +148,12 @@ func GetOrCreate( // Cache the session. sessionCache[sessionKey] = session - logger.V(2).Info("cached vSphere client session", "server", server, "datacenter", datacenter) + logger.V(2).Info("cached vSphere client session", "server", params.server, "datacenter", params.datacenter) return &session, nil } -func newClient(ctx context.Context, url *url.URL, thumprint string) (*govmomi.Client, error) { +func newClient(ctx context.Context, logger logr.Logger, sessionKey string, url *url.URL, thumprint string, feature Feature) (*govmomi.Client, error) { insecure := thumprint == "" soapClient := soap.NewClient(url, insecure) if !insecure { @@ -107,10 +164,29 @@ func newClient(ctx context.Context, url *url.URL, thumprint string) (*govmomi.Cl if err != nil { return nil, err } + c := &govmomi.Client{ Client: vimClient, SessionManager: session.NewManager(vimClient), } + + if feature.EnableKeepAlive { + vimClient.RoundTripper = session.KeepAliveHandler(vimClient.RoundTripper, feature.KeepAliveDuration, func(tripper soap.RoundTripper) error { + // we tried implementing + // c.Login here but the client once logged out + // keeps errong in invalid username or password + // we tried with cached username and password in session still the error persisted + // hence we just clear the cache and expect the client to + // be recreated in next GetOrCreate call + _, err := methods.GetCurrentTime(ctx, tripper) + if err != nil { + logger.Error(err, "failed to keep alive govmomi client") + clearCache(sessionKey) + } + return err + }) + } + if err := c.Login(ctx, url.User); err != nil { return nil, err } @@ -118,6 +194,12 @@ func newClient(ctx context.Context, url *url.URL, thumprint string) (*govmomi.Cl return c, nil } +func clearCache(sessionKey string) { + sessionMU.Lock() + defer sessionMU.Unlock() + delete(sessionCache, sessionKey) +} + // FindByBIOSUUID finds an object by its BIOS UUID. // // To avoid comments about this function's name, please see the Golang