From 4805e6e6f6a1a2162748ad0a07cdead8c20dd226 Mon Sep 17 00:00:00 2001 From: Davide Agnello Date: Sat, 15 Oct 2016 10:22:38 -0700 Subject: [PATCH] vSphere cloud provider: re-use session for vCenter logins Resolves #34491 --- .../providers/vsphere/vsphere.go | 214 ++++++++++-------- .../providers/vsphere/vsphere_test.go | 4 +- 2 files changed, 125 insertions(+), 93 deletions(-) diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index 1ecb49ccd03..c65ed5e8219 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -24,7 +24,9 @@ import ( "net/url" "path" "path/filepath" + "runtime" "strings" + "sync" "gopkg.in/gcfg.v1" @@ -33,6 +35,8 @@ import ( "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/property" + "github.com/vmware/govmomi/session" + "github.com/vmware/govmomi/vim25" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/soap" "github.com/vmware/govmomi/vim25/types" @@ -41,7 +45,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider" k8stypes "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util/runtime" + k8runtime "k8s.io/kubernetes/pkg/util/runtime" ) const ( @@ -61,6 +65,7 @@ const ( EagerZeroedThickDiskType = "eagerZeroedThick" ZeroedThickDiskType = "zeroedThick" VolDir = "kubevols" + RoundTripperDefaultCount = 3 ) // Controller types that are currently supported for hot attach of disks @@ -87,9 +92,12 @@ var ErrNoDevicesFound = errors.New("No devices found") var ErrNonSupportedControllerType = errors.New("Disk is attached to non-supported controller type") var ErrFileAlreadyExist = errors.New("File requested already exist") +var clientLock sync.Mutex + // VSphere is an implementation of cloud provider Interface for VSphere. type VSphere struct { - cfg *VSphereConfig + client *govmomi.Client + cfg *VSphereConfig // InstanceID of the server where this VSphere object is instantiated. localInstanceID string // Cluster that VirtualMachine belongs to @@ -114,6 +122,8 @@ type VSphereConfig struct { Datastore string `gcfg:"datastore"` // WorkingDir is path where VMs can be found. WorkingDir string `gcfg:"working-dir"` + // Soap round tripper count (retries = RoundTripper - 1) + RoundTripperCount uint `gcfg:"soap-roundtrip-count"` } Network struct { @@ -190,7 +200,7 @@ func init() { // Returns the name of the VM and its Cluster on which this code is running. // This is done by searching for the name of virtual machine by current IP. // Prerequisite: this code assumes VMWare vmtools or open-vm-tools to be installed in the VM. -func readInstance(cfg *VSphereConfig) (string, string, error) { +func readInstance(client *govmomi.Client, cfg *VSphereConfig) (string, string, error) { addrs, err := net.InterfaceAddrs() if err != nil { return "", "", err @@ -204,15 +214,8 @@ func readInstance(cfg *VSphereConfig) (string, string, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(cfg, ctx) - if err != nil { - return "", "", err - } - defer c.Logout(ctx) - // Create a new finder - f := find.NewFinder(c.Client, true) + f := find.NewFinder(client.Client, true) // Fetch and set data center dc, err := f.Datacenter(ctx, cfg.Global.Datacenter) @@ -221,7 +224,7 @@ func readInstance(cfg *VSphereConfig) (string, string, error) { } f.SetDatacenter(dc) - s := object.NewSearchIndex(c.Client) + s := object.NewSearchIndex(client.Client) var svm object.Reference for _, v := range addrs { @@ -267,11 +270,6 @@ func readInstance(cfg *VSphereConfig) (string, string, error) { } func newVSphere(cfg VSphereConfig) (*VSphere, error) { - id, cluster, err := readInstance(&cfg) - if err != nil { - return nil, err - } - if cfg.Disk.SCSIControllerType == "" { cfg.Disk.SCSIControllerType = LSILogicSASControllerType } else if !checkControllerSupported(cfg.Disk.SCSIControllerType) { @@ -281,11 +279,28 @@ func newVSphere(cfg VSphereConfig) (*VSphere, error) { if cfg.Global.WorkingDir != "" { cfg.Global.WorkingDir = path.Clean(cfg.Global.WorkingDir) + "/" } + if cfg.Global.RoundTripperCount == 0 { + cfg.Global.RoundTripperCount = RoundTripperDefaultCount + } + + c, err := newClient(&cfg, context.TODO()) + if err != nil { + return nil, err + } + + id, cluster, err := readInstance(c, &cfg) + if err != nil { + return nil, err + } + vs := VSphere{ + client: c, cfg: &cfg, localInstanceID: id, clusterName: cluster, } + runtime.SetFinalizer(&vs, logout) + return &vs, nil } @@ -299,9 +314,11 @@ func checkControllerSupported(ctrlType string) bool { return false } -// Returns a client which communicates with vCenter. -// This client can used to perform further vCenter operations. -func vsphereLogin(cfg *VSphereConfig, ctx context.Context) (*govmomi.Client, error) { +func logout(vs *VSphere) { + vs.client.Logout(context.TODO()) +} + +func newClient(cfg *VSphereConfig, ctx context.Context) (*govmomi.Client, error) { // Parse URL from string u, err := url.Parse(fmt.Sprintf("https://%s:%s/sdk", cfg.Global.VCenterIP, cfg.Global.VCenterPort)) if err != nil { @@ -316,9 +333,44 @@ func vsphereLogin(cfg *VSphereConfig, ctx context.Context) (*govmomi.Client, err return nil, err } + // Add retry functionality + c.RoundTripper = vim25.Retry(c.RoundTripper, vim25.TemporaryNetworkError(int(cfg.Global.RoundTripperCount))) + return c, nil } +// Returns a client which communicates with vCenter. +// This client can used to perform further vCenter operations. +func vSphereLogin(vs *VSphere, ctx context.Context) error { + var err error + clientLock.Lock() + defer clientLock.Unlock() + if vs.client == nil { + vs.client, err = newClient(vs.cfg, ctx) + if err != nil { + return err + } + return nil + } + + m := session.NewManager(vs.client.Client) + // retrieve client's current session + u, err := m.UserSession(ctx) + if err == nil && u == nil { + // current session is valid + return nil + } + + glog.Warningf("Creating new client session since the existing session is not valid or not authenticated") + vs.client.Logout(ctx) + vs.client, err = newClient(vs.cfg, ctx) + if err != nil { + return err + } + + return nil +} + // Returns vSphere object `virtual machine` by its name. func getVirtualMachineByName(cfg *VSphereConfig, ctx context.Context, c *govmomi.Client, nodeName k8stypes.NodeName) (*object.VirtualMachine, error) { name := nodeNameToVMName(nodeName) @@ -399,26 +451,28 @@ func getInstances(cfg *VSphereConfig, ctx context.Context, c *govmomi.Client, fi } type Instances struct { + client *govmomi.Client cfg *VSphereConfig localInstanceID string } // Instances returns an implementation of Instances for vSphere. func (vs *VSphere) Instances() (cloudprovider.Instances, bool) { - return &Instances{vs.cfg, vs.localInstanceID}, true + // Ensure client is logged in and session is valid + err := vSphereLogin(vs, context.TODO()) + if err != nil { + glog.Errorf("Failed to login into vCenter - %v", err) + return nil, false + } + return &Instances{vs.client, vs.cfg, vs.localInstanceID}, true } // List returns names of VMs (inside vm folder) by applying filter and which are currently running. func (i *Instances) List(filter string) ([]k8stypes.NodeName, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := vsphereLogin(i.cfg, ctx) - if err != nil { - return nil, err - } - defer c.Logout(ctx) - vmList, err := getInstances(i.cfg, ctx, c, filter) + vmList, err := getInstances(i.cfg, ctx, i.client, filter) if err != nil { return nil, err } @@ -441,20 +495,13 @@ func (i *Instances) NodeAddresses(nodeName k8stypes.NodeName) ([]api.NodeAddress ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(i.cfg, ctx) - if err != nil { - return nil, err - } - defer c.Logout(ctx) - - vm, err := getVirtualMachineByName(i.cfg, ctx, c, nodeName) + vm, err := getVirtualMachineByName(i.cfg, ctx, i.client, nodeName) if err != nil { return nil, err } var mvm mo.VirtualMachine - err = getVirtualMachineManagedObjectReference(ctx, c, vm, "guest.net", &mvm) + err = getVirtualMachineManagedObjectReference(ctx, i.client, vm, "guest.net", &mvm) if err != nil { return nil, err } @@ -503,14 +550,7 @@ func (i *Instances) ExternalID(nodeName k8stypes.NodeName) (string, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(i.cfg, ctx) - if err != nil { - return "", err - } - defer c.Logout(ctx) - - vm, err := getVirtualMachineByName(i.cfg, ctx, c, nodeName) + vm, err := getVirtualMachineByName(i.cfg, ctx, i.client, nodeName) if err != nil { if _, ok := err.(*find.NotFoundError); ok { return "", cloudprovider.InstanceNotFound @@ -519,7 +559,7 @@ func (i *Instances) ExternalID(nodeName k8stypes.NodeName) (string, error) { } var mvm mo.VirtualMachine - err = getVirtualMachineManagedObjectReference(ctx, c, vm, "summary", &mvm) + err = getVirtualMachineManagedObjectReference(ctx, i.client, vm, "summary", &mvm) if err != nil { return "", err } @@ -543,14 +583,7 @@ func (i *Instances) InstanceID(nodeName k8stypes.NodeName) (string, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(i.cfg, ctx) - if err != nil { - return "", err - } - defer c.Logout(ctx) - - vm, err := getVirtualMachineByName(i.cfg, ctx, c, nodeName) + vm, err := getVirtualMachineByName(i.cfg, ctx, i.client, nodeName) if err != nil { if _, ok := err.(*find.NotFoundError); ok { return "", cloudprovider.InstanceNotFound @@ -559,7 +592,7 @@ func (i *Instances) InstanceID(nodeName k8stypes.NodeName) (string, error) { } var mvm mo.VirtualMachine - err = getVirtualMachineManagedObjectReference(ctx, c, vm, "summary", &mvm) + err = getVirtualMachineManagedObjectReference(ctx, i.client, vm, "summary", &mvm) if err != nil { return "", err } @@ -597,13 +630,13 @@ func (vs *VSphere) LoadBalancer() (cloudprovider.LoadBalancer, bool) { // Zones returns an implementation of Zones for Google vSphere. func (vs *VSphere) Zones() (cloudprovider.Zones, bool) { - glog.V(4).Info("Claiming to support Zones") + glog.V(1).Info("Claiming to support Zones") return vs, true } func (vs *VSphere) GetZone() (cloudprovider.Zone, error) { - glog.V(4).Infof("Current datacenter is %v, cluster is %v", vs.cfg.Global.Datacenter, vs.clusterName) + glog.V(1).Infof("Current datacenter is %v, cluster is %v", vs.cfg.Global.Datacenter, vs.clusterName) // The clusterName is determined from the VirtualMachine ManagedObjectReference during init // If the VM is not created within a Cluster, this will return empty-string @@ -679,12 +712,12 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (di ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(vs.cfg, ctx) + // Ensure client is logged in and session is valid + err = vSphereLogin(vs, ctx) if err != nil { + glog.Errorf("Failed to login into vCenter - %v", err) return "", "", err } - defer c.Logout(ctx) // Find virtual machine to attach disk to var vSphereInstance string @@ -696,15 +729,15 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (di } // Get VM device list - vm, vmDevices, ds, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) + vm, vmDevices, ds, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance) if err != nil { return "", "", err } - attached, _ := checkDiskAttached(vmDiskPath, vmDevices, dc, c) + attached, _ := checkDiskAttached(vmDiskPath, vmDevices, dc, vs.client) if attached { - diskID, _ = getVirtualDiskID(vmDiskPath, vmDevices, dc, c) - diskUUID, _ = getVirtualDiskUUIDByPath(vmDiskPath, dc, c) + diskID, _ = getVirtualDiskID(vmDiskPath, vmDevices, dc, vs.client) + diskUUID, _ = getVirtualDiskUUIDByPath(vmDiskPath, dc, vs.client) return diskID, diskUUID, nil } @@ -723,10 +756,10 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (di // we reached the maximum number of controllers we can attach return "", "", fmt.Errorf("SCSI Controller Limit of %d has been reached, cannot create another SCSI controller", SCSIControllerLimit) } - glog.V(4).Infof("Creating a SCSI controller of %v type", diskControllerType) + glog.V(1).Infof("Creating a SCSI controller of %v type", diskControllerType) newSCSIController, err := vmDevices.CreateSCSIController(diskControllerType) if err != nil { - runtime.HandleError(fmt.Errorf("error creating new SCSI controller: %v", err)) + k8runtime.HandleError(fmt.Errorf("error creating new SCSI controller: %v", err)) return "", "", err } configNewSCSIController := newSCSIController.(types.BaseVirtualSCSIController).GetVirtualSCSIController() @@ -737,7 +770,7 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName k8stypes.NodeName) (di // add the scsi controller to virtual machine err = vm.AddDevice(context.TODO(), newSCSIController) if err != nil { - glog.V(3).Infof("cannot add SCSI controller to vm - %v", err) + glog.V(1).Infof("cannot add SCSI controller to vm - %v", err) // attempt clean up of scsi controller if vmDevices, err := vm.Device(ctx); err == nil { cleanUpController(newSCSIController, vmDevices, vm, ctx) @@ -894,13 +927,12 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(vs.cfg, ctx) + // Ensure client is logged in and session is valid + err := vSphereLogin(vs, ctx) if err != nil { - glog.Errorf("Failed to create vSphere client. err: %s", err) + glog.Errorf("Failed to login into vCenter - %v", err) return false, err } - defer c.Logout(ctx) // Find VM to detach disk from var vSphereInstance string @@ -911,8 +943,7 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b vSphereInstance = nodeNameToVMName(nodeName) } - nodeExist, err := vs.NodeExists(c, nodeName) - + nodeExist, err := vs.NodeExists(vs.client, nodeName) if err != nil { glog.Errorf("Failed to check whether node exist. err: %s.", err) return false, err @@ -927,13 +958,13 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b } // Get VM device list - _, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) + _, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance) if err != nil { glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err) return false, err } - attached, err := checkDiskAttached(volPath, vmDevices, dc, c) + attached, err := checkDiskAttached(volPath, vmDevices, dc, vs.client) return attached, err } @@ -1067,12 +1098,12 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(vs.cfg, ctx) + // Ensure client is logged in and session is valid + err := vSphereLogin(vs, ctx) if err != nil { + glog.Errorf("Failed to login into vCenter - %v", err) return err } - defer c.Logout(ctx) // Find virtual machine to attach disk to var vSphereInstance string @@ -1083,7 +1114,7 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error vSphereInstance = nodeNameToVMName(nodeName) } - nodeExist, err := vs.NodeExists(c, nodeName) + nodeExist, err := vs.NodeExists(vs.client, nodeName) if err != nil { glog.Errorf("Failed to check whether node exist. err: %s.", err) @@ -1098,12 +1129,12 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error return nil } - vm, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) + vm, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, vs.client, vSphereInstance) if err != nil { return err } - diskID, err := getVirtualDiskID(volPath, vmDevices, dc, c) + diskID, err := getVirtualDiskID(volPath, vmDevices, dc, vs.client) if err != nil { glog.Warningf("disk ID not found for %v ", volPath) return err @@ -1145,15 +1176,15 @@ func (vs *VSphere) CreateVolume(volumeOptions *VolumeOptions) (volumePath string ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(vs.cfg, ctx) + // Ensure client is logged in and session is valid + err = vSphereLogin(vs, ctx) if err != nil { + glog.Errorf("Failed to login into vCenter - %v", err) return "", err } - defer c.Logout(ctx) // Create a new finder - f := find.NewFinder(c.Client, true) + f := find.NewFinder(vs.client.Client, true) // Fetch and set data center dc, err := f.Datacenter(ctx, vs.cfg.Global.Datacenter) @@ -1167,7 +1198,7 @@ func (vs *VSphere) CreateVolume(volumeOptions *VolumeOptions) (volumePath string // vmdks will be created inside kubevols directory kubeVolsPath := filepath.Clean(ds.Path(VolDir)) + "/" - err = makeDirectoryInDatastore(c, dc, kubeVolsPath, false) + err = makeDirectoryInDatastore(vs.client, dc, kubeVolsPath, false) if err != nil && err != ErrFileAlreadyExist { glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err) return "", err @@ -1175,8 +1206,9 @@ func (vs *VSphere) CreateVolume(volumeOptions *VolumeOptions) (volumePath string glog.V(4).Infof("Created dir with path as %+q", kubeVolsPath) vmDiskPath := kubeVolsPath + volumeOptions.Name + ".vmdk" + // Create a virtual disk manager - virtualDiskManager := object.NewVirtualDiskManager(c.Client) + virtualDiskManager := object.NewVirtualDiskManager(vs.client.Client) // Create specification for new virtual disk vmDiskSpec := &types.FileBackedVirtualDiskSpec{ @@ -1206,22 +1238,22 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create vSphere client - c, err := vsphereLogin(vs.cfg, ctx) + // Ensure client is logged in and session is valid + err := vSphereLogin(vs, ctx) if err != nil { + glog.Errorf("Failed to login into vCenter - %v", err) return err } - defer c.Logout(ctx) // Create a new finder - f := find.NewFinder(c.Client, true) + f := find.NewFinder(vs.client.Client, true) // Fetch and set data center dc, err := f.Datacenter(ctx, vs.cfg.Global.Datacenter) f.SetDatacenter(dc) // Create a virtual disk manager - virtualDiskManager := object.NewVirtualDiskManager(c.Client) + virtualDiskManager := object.NewVirtualDiskManager(vs.client.Client) // Delete virtual disk task, err := virtualDiskManager.DeleteVirtualDisk(ctx, vmDiskPath, dc) diff --git a/pkg/cloudprovider/providers/vsphere/vsphere_test.go b/pkg/cloudprovider/providers/vsphere/vsphere_test.go index 5f0be5677e8..8f92fa22374 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere_test.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere_test.go @@ -118,11 +118,11 @@ func TestVSphereLogin(t *testing.T) { defer cancel() // Create vSphere client - c, err := vsphereLogin(vs.cfg, ctx) + err = vSphereLogin(vs, ctx) if err != nil { t.Errorf("Failed to create vSpere client: %s", err) } - defer c.Logout(ctx) + defer vs.client.Logout(ctx) } func TestZones(t *testing.T) {