From 79e1da68d2f93f8becc903f84f4da515989eaa69 Mon Sep 17 00:00:00 2001 From: rohitjogvmw Date: Wed, 15 Nov 2017 22:24:23 -0800 Subject: [PATCH] Updating vSphere Cloud Provider (VCP) to support k8s cluster spead across multiple ESXi clusters, datacenters or even vSphere vCenters - vsphere.conf (cloud-config) is now needed only on master node - VCP uses OS hostname and not vSphere inventory name - VCP is now resilient to VM inventory name change and VM migration --- pkg/cloudprovider/providers/vsphere/BUILD | 5 +- .../providers/vsphere/nodemanager.go | 295 +++++++ .../providers/vsphere/vclib/custom_errors.go | 2 + .../providers/vsphere/vclib/datacenter.go | 70 +- .../providers/vsphere/vclib/datastore.go | 11 + .../vsphere/vclib/diskmanagers/vdm.go | 8 +- .../vsphere/vclib/diskmanagers/virtualdisk.go | 6 +- .../vsphere/vclib/diskmanagers/vmdm.go | 2 +- .../providers/vsphere/vclib/pbm.go | 10 +- .../providers/vsphere/vclib/utils.go | 43 +- .../providers/vsphere/vclib/virtualmachine.go | 44 +- .../providers/vsphere/vsphere.go | 768 +++++++++++++----- .../providers/vsphere/vsphere_test.go | 19 +- .../providers/vsphere/vsphere_util.go | 341 ++++++-- pkg/volume/vsphere_volume/attacher.go | 2 +- .../e2e/storage/persistent_volumes-vsphere.go | 12 +- test/e2e/storage/pv_reclaimpolicy.go | 14 +- test/e2e/storage/pvc_label_selector.go | 5 +- test/e2e/storage/volumes.go | 7 +- test/e2e/storage/vsphere_scale.go | 6 +- test/e2e/storage/vsphere_statefulsets.go | 7 +- test/e2e/storage/vsphere_stress.go | 10 +- test/e2e/storage/vsphere_utils.go | 44 +- test/e2e/storage/vsphere_volume_cluster_ds.go | 7 +- test/e2e/storage/vsphere_volume_datastore.go | 2 +- test/e2e/storage/vsphere_volume_diskformat.go | 4 +- test/e2e/storage/vsphere_volume_fstype.go | 10 +- .../storage/vsphere_volume_master_restart.go | 9 +- .../storage/vsphere_volume_node_poweroff.go | 8 +- test/e2e/storage/vsphere_volume_ops_storm.go | 6 +- test/e2e/storage/vsphere_volume_perf.go | 7 +- test/e2e/storage/vsphere_volume_placement.go | 8 +- .../e2e/storage/vsphere_volume_vsan_policy.go | 6 +- 33 files changed, 1412 insertions(+), 386 deletions(-) create mode 100644 pkg/cloudprovider/providers/vsphere/nodemanager.go diff --git a/pkg/cloudprovider/providers/vsphere/BUILD b/pkg/cloudprovider/providers/vsphere/BUILD index f5b75b0c9e9..91de27e7adc 100644 --- a/pkg/cloudprovider/providers/vsphere/BUILD +++ b/pkg/cloudprovider/providers/vsphere/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "nodemanager.go", "vsphere.go", "vsphere_util.go", ], @@ -21,13 +22,15 @@ go_library( "//pkg/controller:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/vmware/govmomi:go_default_library", - "//vendor/github.com/vmware/govmomi/object:go_default_library", "//vendor/github.com/vmware/govmomi/vim25:go_default_library", "//vendor/github.com/vmware/govmomi/vim25/mo:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", "//vendor/gopkg.in/gcfg.v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/pkg/cloudprovider/providers/vsphere/nodemanager.go b/pkg/cloudprovider/providers/vsphere/nodemanager.go new file mode 100644 index 00000000000..493ea61045e --- /dev/null +++ b/pkg/cloudprovider/providers/vsphere/nodemanager.go @@ -0,0 +1,295 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "fmt" + "github.com/golang/glog" + "golang.org/x/net/context" + "k8s.io/api/core/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib" + "strings" + "sync" +) + +// Stores info about the kubernetes node +type NodeInfo struct { + dataCenter *vclib.Datacenter + vm *vclib.VirtualMachine + vcServer string +} + +type NodeManager struct { + // TODO: replace map with concurrent map when k8s supports go v1.9 + + // Maps the VC server to VSphereInstance + vsphereInstanceMap map[string]*VSphereInstance + // Maps node name to node info. + nodeInfoMap map[string]*NodeInfo + // Maps node name to node structure + registeredNodes map[string]*v1.Node + + // Mutexes + registeredNodesLock sync.RWMutex + nodeInfoLock sync.RWMutex +} + +type NodeDetails struct { + NodeName string + vm *vclib.VirtualMachine +} + +// TODO: Make it configurable in vsphere.conf +const ( + POOL_SIZE = 8 + QUEUE_SIZE = POOL_SIZE * 10 +) + +func (nm *NodeManager) DiscoverNode(node *v1.Node) error { + type VmSearch struct { + vc string + datacenter *vclib.Datacenter + } + + var mutex = &sync.Mutex{} + var globalErrMutex = &sync.Mutex{} + var queueChannel chan *VmSearch + var wg sync.WaitGroup + var globalErr *error + + queueChannel = make(chan *VmSearch, QUEUE_SIZE) + nodeUUID := node.Status.NodeInfo.SystemUUID + vmFound := false + globalErr = nil + + setGlobalErr := func(err error) { + globalErrMutex.Lock() + globalErr = &err + globalErrMutex.Unlock() + } + + setVMFound := func(found bool) { + mutex.Lock() + vmFound = found + mutex.Unlock() + } + + getVMFound := func() bool { + mutex.Lock() + found := vmFound + mutex.Unlock() + return found + } + + go func() { + var datacenterObjs []*vclib.Datacenter + for vc, vsi := range nm.vsphereInstanceMap { + + found := getVMFound() + if found == true { + break + } + + // Create context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := vsi.conn.Connect(ctx) + if err != nil { + glog.V(4).Info("Discovering node error vc:", err) + setGlobalErr(err) + continue + } + + if vsi.cfg.Datacenters == "" { + datacenterObjs, err = vclib.GetAllDatacenter(ctx, vsi.conn) + if err != nil { + glog.V(4).Info("Discovering node error dc:", err) + setGlobalErr(err) + continue + } + } else { + datacenters := strings.Split(vsi.cfg.Datacenters, ",") + for _, dc := range datacenters { + dc = strings.TrimSpace(dc) + if dc == "" { + continue + } + datacenterObj, err := vclib.GetDatacenter(ctx, vsi.conn, dc) + if err != nil { + glog.V(4).Info("Discovering node error dc:", err) + setGlobalErr(err) + continue + } + datacenterObjs = append(datacenterObjs, datacenterObj) + } + } + + for _, datacenterObj := range datacenterObjs { + found := getVMFound() + if found == true { + break + } + + glog.V(4).Infof("Finding node %s in vc=%s and datacenter=%s", node.Name, vc, datacenterObj.Name()) + queueChannel <- &VmSearch{ + vc: vc, + datacenter: datacenterObj, + } + } + } + close(queueChannel) + }() + + for i := 0; i < POOL_SIZE; i++ { + go func() { + for res := range queueChannel { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + vm, err := res.datacenter.GetVMByUUID(ctx, nodeUUID) + if err != nil { + glog.V(4).Infof("Error %q while looking for vm=%+v in vc=%s and datacenter=%s", + err, node.Name, vm, res.vc, res.datacenter.Name()) + if err != vclib.ErrNoVMFound { + setGlobalErr(err) + } else { + glog.V(4).Infof("Did not find node %s in vc=%s and datacenter=%s", + node.Name, res.vc, res.datacenter.Name(), err) + } + continue + } + if vm != nil { + glog.V(4).Infof("Found node %s as vm=%+v in vc=%s and datacenter=%s", + node.Name, vm, res.vc, res.datacenter.Name()) + + nodeInfo := &NodeInfo{dataCenter: res.datacenter, vm: vm, vcServer: res.vc} + nm.addNodeInfo(node.ObjectMeta.Name, nodeInfo) + for range queueChannel { + } + setVMFound(true) + break + } + } + wg.Done() + }() + wg.Add(1) + } + wg.Wait() + if vmFound { + return nil + } + if globalErr != nil { + return *globalErr + } + + glog.V(4).Infof("Discovery Node: %q vm not found", node.Name) + return vclib.ErrNoVMFound +} + +func (nm *NodeManager) RegisterNode(node *v1.Node) error { + nm.addNode(node) + nm.DiscoverNode(node) + return nil +} + +func (nm *NodeManager) UnRegisterNode(node *v1.Node) error { + nm.removeNode(node) + return nil +} + +func (nm *NodeManager) RediscoverNode(nodeName k8stypes.NodeName) error { + node, err := nm.GetNode(nodeName) + + if err != nil { + return err + } + return nm.DiscoverNode(&node) +} + +func (nm *NodeManager) GetNode(nodeName k8stypes.NodeName) (v1.Node, error) { + nm.registeredNodesLock.RLock() + node := nm.registeredNodes[convertToString(nodeName)] + nm.registeredNodesLock.RUnlock() + if node == nil { + return v1.Node{}, vclib.ErrNoVMFound + } + return *node, nil +} + +func (nm *NodeManager) addNode(node *v1.Node) { + nm.registeredNodesLock.Lock() + nm.registeredNodes[node.ObjectMeta.Name] = node + nm.registeredNodesLock.Unlock() +} + +func (nm *NodeManager) removeNode(node *v1.Node) { + nm.registeredNodesLock.Lock() + delete(nm.registeredNodes, node.ObjectMeta.Name) + nm.registeredNodesLock.Unlock() +} + +// GetNodeInfo returns a NodeInfo which datacenter, vm and vc server ip address. +// This method returns an error if it is unable find node VCs and DCs listed in vSphere.conf +// NodeInfo returned may not be updated to reflect current VM location. +func (nm *NodeManager) GetNodeInfo(nodeName k8stypes.NodeName) (NodeInfo, error) { + getNodeInfo := func(nodeName k8stypes.NodeName) *NodeInfo { + nm.nodeInfoLock.RLock() + nodeInfo := nm.nodeInfoMap[convertToString(nodeName)] + nm.nodeInfoLock.RUnlock() + return nodeInfo + } + nodeInfo := getNodeInfo(nodeName) + if nodeInfo == nil { + err := nm.RediscoverNode(nodeName) + if err != nil { + glog.V(4).Infof("error %q node info for node %q not found", err, convertToString(nodeName)) + return NodeInfo{}, err + } + nodeInfo = getNodeInfo(nodeName) + } + return *nodeInfo, nil +} + +func (nm *NodeManager) GetNodeDetails() []NodeDetails { + nm.nodeInfoLock.RLock() + defer nm.nodeInfoLock.RUnlock() + var nodeDetails []NodeDetails + for nodeName, nodeInfo := range nm.nodeInfoMap { + nodeDetails = append(nodeDetails, NodeDetails{nodeName, nodeInfo.vm}) + } + return nodeDetails +} + +func (nm *NodeManager) addNodeInfo(nodeName string, nodeInfo *NodeInfo) { + nm.nodeInfoLock.Lock() + nm.nodeInfoMap[nodeName] = nodeInfo + nm.nodeInfoLock.Unlock() +} + +func (nm *NodeManager) GetVSphereInstance(nodeName k8stypes.NodeName) (VSphereInstance, error) { + nodeInfo, err := nm.GetNodeInfo(nodeName) + if err != nil { + glog.V(4).Infof("node info for node %q not found", convertToString(nodeName)) + return VSphereInstance{}, err + } + vsphereInstance := nm.vsphereInstanceMap[nodeInfo.vcServer] + if vsphereInstance == nil { + return VSphereInstance{}, fmt.Errorf("vSphereInstance for vc server %q not found while looking for node %q", nodeInfo.vcServer, convertToString(nodeName)) + } + return *vsphereInstance, nil +} diff --git a/pkg/cloudprovider/providers/vsphere/vclib/custom_errors.go b/pkg/cloudprovider/providers/vsphere/vclib/custom_errors.go index 391f328f426..6709c4cf21a 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/custom_errors.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/custom_errors.go @@ -25,6 +25,7 @@ const ( NoDevicesFoundErrMsg = "No devices found" DiskNotFoundErrMsg = "No vSphere disk ID found" InvalidVolumeOptionsErrMsg = "VolumeOptions verification failed" + NoVMFoundErrMsg = "No VM found" ) // Error constants @@ -34,4 +35,5 @@ var ( ErrNoDevicesFound = errors.New(NoDevicesFoundErrMsg) ErrNoDiskIDFound = errors.New(DiskNotFoundErrMsg) ErrInvalidVolumeOptions = errors.New(InvalidVolumeOptionsErrMsg) + ErrNoVMFound = errors.New(NoVMFoundErrMsg) ) diff --git a/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go b/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go index ebb54b94312..d325c72dfe1 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/datacenter.go @@ -49,6 +49,22 @@ func GetDatacenter(ctx context.Context, connection *VSphereConnection, datacente return &dc, nil } +// GetAllDatacenter returns all the DataCenter Objects +func GetAllDatacenter(ctx context.Context, connection *VSphereConnection) ([]*Datacenter, error) { + var dc []*Datacenter + finder := find.NewFinder(connection.GoVmomiClient.Client, true) + datacenters, err := finder.DatacenterList(ctx, "*") + if err != nil { + glog.Errorf("Failed to find the datacenter. err: %+v", err) + return nil, err + } + for _, datacenter := range datacenters { + dc = append(dc, &(Datacenter{datacenter})) + } + + return dc, nil +} + // GetVMByUUID gets the VM object from the given vmUUID func (dc *Datacenter) GetVMByUUID(ctx context.Context, vmUUID string) (*VirtualMachine, error) { s := object.NewSearchIndex(dc.Client()) @@ -60,7 +76,7 @@ func (dc *Datacenter) GetVMByUUID(ctx context.Context, vmUUID string) (*VirtualM } if svm == nil { glog.Errorf("Unable to find VM by UUID. VM UUID: %s", vmUUID) - return nil, fmt.Errorf("Failed to find VM by UUID: %s", vmUUID) + return nil, ErrNoVMFound } virtualMachine := VirtualMachine{object.NewVirtualMachine(dc.Client(), svm.Reference()), dc} return &virtualMachine, nil @@ -79,6 +95,41 @@ func (dc *Datacenter) GetVMByPath(ctx context.Context, vmPath string) (*VirtualM return &virtualMachine, nil } +// GetAllDatastores gets the datastore URL to DatastoreInfo map for all the datastores in +// the datacenter. +func (dc *Datacenter) GetAllDatastores(ctx context.Context) (map[string]*DatastoreInfo, error) { + finder := getFinder(dc) + datastores, err := finder.DatastoreList(ctx, "*") + if err != nil { + glog.Errorf("Failed to get all the datastores. err: %+v", err) + return nil, err + } + var dsList []types.ManagedObjectReference + for _, ds := range datastores { + dsList = append(dsList, ds.Reference()) + } + + var dsMoList []mo.Datastore + pc := property.DefaultCollector(dc.Client()) + properties := []string{DatastoreInfoProperty} + err = pc.Retrieve(ctx, dsList, properties, &dsMoList) + if err != nil { + glog.Errorf("Failed to get Datastore managed objects from datastore objects."+ + " dsObjList: %+v, properties: %+v, err: %v", dsList, properties, err) + return nil, err + } + + dsURLInfoMap := make(map[string]*DatastoreInfo) + for _, dsMo := range dsMoList { + dsURLInfoMap[dsMo.Info.GetDatastoreInfo().Url] = &DatastoreInfo{ + &Datastore{object.NewDatastore(dc.Client(), dsMo.Reference()), + dc}, + dsMo.Info.GetDatastoreInfo()} + } + glog.V(9).Infof("dsURLInfoMap : %+v", dsURLInfoMap) + return dsURLInfoMap, nil +} + // GetDatastoreByPath gets the Datastore object from the given vmDiskPath func (dc *Datacenter) GetDatastoreByPath(ctx context.Context, vmDiskPath string) (*Datastore, error) { datastorePathObj := new(object.DatastorePath) @@ -109,6 +160,23 @@ func (dc *Datacenter) GetDatastoreByName(ctx context.Context, name string) (*Dat return &datastore, nil } +// GetResourcePool gets the resource pool for the given path +func (dc *Datacenter) GetResourcePool(ctx context.Context, computePath string) (*object.ResourcePool, error) { + finder := getFinder(dc) + var computeResource *object.ComputeResource + var err error + if computePath == "" { + computeResource, err = finder.DefaultComputeResource(ctx) + } else { + computeResource, err = finder.ComputeResource(ctx, computePath) + } + if err != nil { + glog.Errorf("Failed to get the ResourcePool for computePath '%s'. err: %+v", computePath, err) + return nil, err + } + return computeResource.ResourcePool(ctx) +} + // GetFolderByPath gets the Folder Object from the given folder path // folderPath should be the full path to folder func (dc *Datacenter) GetFolderByPath(ctx context.Context, folderPath string) (*Folder, error) { diff --git a/pkg/cloudprovider/providers/vsphere/vclib/datastore.go b/pkg/cloudprovider/providers/vsphere/vclib/datastore.go index 1901af18909..8fba424bbd9 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/datastore.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/datastore.go @@ -17,6 +17,7 @@ limitations under the License. package vclib import ( + "fmt" "github.com/golang/glog" "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/property" @@ -32,6 +33,16 @@ type Datastore struct { Datacenter *Datacenter } +// DatastoreInfo is a structure to store the Datastore and it's Info. +type DatastoreInfo struct { + *Datastore + Info *types.DatastoreInfo +} + +func (di DatastoreInfo) String() string { + return fmt.Sprintf("Datastore: %+v, datastore URL: %s", di.Datastore, di.Info.Url) +} + // CreateDirectory creates the directory at location specified by directoryPath. // If the intermediate level folders do not exist, and the parameter createParents is true, all the non-existent folders are created. // directoryPath must be in the format "[vsanDatastore] kubevols" diff --git a/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vdm.go b/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vdm.go index 8d860b9d548..3e6d9b2ecd9 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vdm.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vdm.go @@ -70,13 +70,13 @@ func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vcl } // Delete implements Disk's Delete interface -func (diskManager virtualDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error { +func (diskManager virtualDiskManager) Delete(ctx context.Context, datacenter *vclib.Datacenter) error { // Create a virtual disk manager - virtualDiskManager := object.NewVirtualDiskManager(datastore.Client()) - diskPath := vclib.RemoveClusterFromVDiskPath(diskManager.diskPath) + virtualDiskManager := object.NewVirtualDiskManager(datacenter.Client()) + diskPath := vclib.RemoveStorageClusterORFolderNameFromVDiskPath(diskManager.diskPath) requestTime := time.Now() // Delete virtual disk - task, err := virtualDiskManager.DeleteVirtualDisk(ctx, diskPath, datastore.Datacenter.Datacenter) + task, err := virtualDiskManager.DeleteVirtualDisk(ctx, diskPath, datacenter.Datacenter) if err != nil { glog.Errorf("Failed to delete virtual disk. err: %v", err) vclib.RecordvSphereMetric(vclib.APIDeleteVolume, requestTime, err) diff --git a/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/virtualdisk.go b/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/virtualdisk.go index fbe14b5fbbd..533f49ece30 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/virtualdisk.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/virtualdisk.go @@ -40,7 +40,7 @@ const ( // VirtualDiskProvider defines interfaces for creating disk type VirtualDiskProvider interface { Create(ctx context.Context, datastore *vclib.Datastore) (string, error) - Delete(ctx context.Context, datastore *vclib.Datastore) error + Delete(ctx context.Context, datacenter *vclib.Datacenter) error } // getDiskManager returns vmDiskManager or vdmDiskManager based on given volumeoptions @@ -75,6 +75,6 @@ func (virtualDisk *VirtualDisk) Create(ctx context.Context, datastore *vclib.Dat } // Delete gets appropriate disk manager and calls respective delete method -func (virtualDisk *VirtualDisk) Delete(ctx context.Context, datastore *vclib.Datastore) error { - return getDiskManager(virtualDisk, VirtualDiskDeleteOperation).Delete(ctx, datastore) +func (virtualDisk *VirtualDisk) Delete(ctx context.Context, datacenter *vclib.Datacenter) error { + return getDiskManager(virtualDisk, VirtualDiskDeleteOperation).Delete(ctx, datacenter) } diff --git a/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vmdm.go b/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vmdm.go index 62c7018c5cf..6942dffb7f8 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vmdm.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers/vmdm.go @@ -157,7 +157,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto return vmdisk.diskPath, nil } -func (vmdisk vmDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error { +func (vmdisk vmDiskManager) Delete(ctx context.Context, datacenter *vclib.Datacenter) error { return fmt.Errorf("vmDiskManager.Delete is not supported") } diff --git a/pkg/cloudprovider/providers/vsphere/vclib/pbm.go b/pkg/cloudprovider/providers/vsphere/vclib/pbm.go index df749fb8966..5ec83c62b36 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/pbm.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/pbm.go @@ -85,7 +85,7 @@ func (pbmClient *PbmClient) IsDatastoreCompatible(ctx context.Context, storagePo // GetCompatibleDatastores filters and returns compatible list of datastores for given storage policy id // For Non Compatible Datastores, fault message with the Datastore Name is also returned -func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, storagePolicyID string, datastores []*Datastore) ([]*Datastore, string, error) { +func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, dc *Datacenter, storagePolicyID string, datastores []*DatastoreInfo) ([]*DatastoreInfo, string, error) { var ( dsMorNameMap = getDsMorNameMap(ctx, datastores) localizedMessagesForNotCompatibleDatastores = "" @@ -96,7 +96,7 @@ func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, storage return nil, "", err } compatibleHubs := compatibilityResult.CompatibleDatastores() - var compatibleDatastoreList []*Datastore + var compatibleDatastoreList []*DatastoreInfo for _, hub := range compatibleHubs { compatibleDatastoreList = append(compatibleDatastoreList, getDatastoreFromPlacementHub(datastores, hub)) } @@ -121,7 +121,7 @@ func (pbmClient *PbmClient) GetCompatibleDatastores(ctx context.Context, storage } // GetPlacementCompatibilityResult gets placement compatibility result based on storage policy requirements. -func (pbmClient *PbmClient) GetPlacementCompatibilityResult(ctx context.Context, storagePolicyID string, datastore []*Datastore) (pbm.PlacementCompatibilityResult, error) { +func (pbmClient *PbmClient) GetPlacementCompatibilityResult(ctx context.Context, storagePolicyID string, datastore []*DatastoreInfo) (pbm.PlacementCompatibilityResult, error) { var hubs []pbmtypes.PbmPlacementHub for _, ds := range datastore { hubs = append(hubs, pbmtypes.PbmPlacementHub{ @@ -145,7 +145,7 @@ func (pbmClient *PbmClient) GetPlacementCompatibilityResult(ctx context.Context, } // getDataStoreForPlacementHub returns matching datastore associated with given pbmPlacementHub -func getDatastoreFromPlacementHub(datastore []*Datastore, pbmPlacementHub pbmtypes.PbmPlacementHub) *Datastore { +func getDatastoreFromPlacementHub(datastore []*DatastoreInfo, pbmPlacementHub pbmtypes.PbmPlacementHub) *DatastoreInfo { for _, ds := range datastore { if ds.Reference().Type == pbmPlacementHub.HubType && ds.Reference().Value == pbmPlacementHub.HubId { return ds @@ -155,7 +155,7 @@ func getDatastoreFromPlacementHub(datastore []*Datastore, pbmPlacementHub pbmtyp } // getDsMorNameMap returns map of ds Mor and Datastore Object Name -func getDsMorNameMap(ctx context.Context, datastores []*Datastore) map[string]string { +func getDsMorNameMap(ctx context.Context, datastores []*DatastoreInfo) map[string]string { dsMorNameMap := make(map[string]string) for _, ds := range datastores { dsObjectName, err := ds.ObjectName(ctx) diff --git a/pkg/cloudprovider/providers/vsphere/vclib/utils.go b/pkg/cloudprovider/providers/vsphere/vclib/utils.go index 791d05d33da..bac429d6deb 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/utils.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/utils.go @@ -25,6 +25,8 @@ import ( "github.com/golang/glog" "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/vim25/mo" + "github.com/vmware/govmomi/vim25/soap" "github.com/vmware/govmomi/vim25/types" ) @@ -121,10 +123,10 @@ func getSCSIControllers(vmDevices object.VirtualDeviceList) []*types.VirtualCont return scsiControllers } -// RemoveClusterFromVDiskPath removes the cluster or folder path from the vDiskPath +// RemoveStorageClusterORFolderNameFromVDiskPath removes the cluster or folder path from the vDiskPath // for vDiskPath [DatastoreCluster/sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk, return value is [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk // for vDiskPath [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk, return value remains same [sharedVmfs-0] kubevols/e2e-vmdk-1234.vmdk -func RemoveClusterFromVDiskPath(vDiskPath string) string { +func RemoveStorageClusterORFolderNameFromVDiskPath(vDiskPath string) string { datastore := regexp.MustCompile("\\[(.*?)\\]").FindStringSubmatch(vDiskPath)[1] if filepath.Base(datastore) != datastore { vDiskPath = strings.Replace(vDiskPath, datastore, filepath.Base(datastore), 1) @@ -172,3 +174,40 @@ func IsValidUUID(uuid string) bool { r := regexp.MustCompile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$") return r.MatchString(uuid) } + +// IsManagedObjectNotFoundError returns true if error is of type ManagedObjectNotFound +func IsManagedObjectNotFoundError(err error) bool { + isManagedObjectNotFoundError := false + if soap.IsSoapFault(err) { + _, isManagedObjectNotFoundError = soap.ToSoapFault(err).VimFault().(types.ManagedObjectNotFound) + } + return isManagedObjectNotFoundError +} + +// VerifyVolumePathsForVM verifies if the volume paths (volPaths) are attached to VM. +func VerifyVolumePathsForVM(vmMo mo.VirtualMachine, volPaths []string, nodeName string, nodeVolumeMap map[string]map[string]bool) { + // Verify if the volume paths are present on the VM backing virtual disk devices + vmDevices := object.VirtualDeviceList(vmMo.Config.Hardware.Device) + VerifyVolumePathsForVMDevices(vmDevices, volPaths, nodeName, nodeVolumeMap) + +} + +// VerifyVolumePathsForVMDevices verifies if the volume paths (volPaths) are attached to VM. +func VerifyVolumePathsForVMDevices(vmDevices object.VirtualDeviceList, volPaths []string, nodeName string, nodeVolumeMap map[string]map[string]bool) { + volPathsMap := make(map[string]bool) + for _, volPath := range volPaths { + volPathsMap[volPath] = true + } + // Verify if the volume paths are present on the VM backing virtual disk devices + for _, device := range vmDevices { + if vmDevices.TypeName(device) == "VirtualDisk" { + virtualDevice := device.GetVirtualDevice() + if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok { + if volPathsMap[backing.FileName] { + setNodeVolumeMap(nodeVolumeMap, backing.FileName, nodeName, true) + } + } + } + } + +} diff --git a/pkg/cloudprovider/providers/vsphere/vclib/virtualmachine.go b/pkg/cloudprovider/providers/vsphere/vclib/virtualmachine.go index 2796f6b6877..8077b5583e6 100644 --- a/pkg/cloudprovider/providers/vsphere/vclib/virtualmachine.go +++ b/pkg/cloudprovider/providers/vsphere/vclib/virtualmachine.go @@ -23,6 +23,7 @@ import ( "github.com/golang/glog" "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/property" "github.com/vmware/govmomi/vim25/mo" "github.com/vmware/govmomi/vim25/types" ) @@ -63,7 +64,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol return "", fmt.Errorf("Not a valid SCSI Controller Type. Valid options are %q", SCSIControllerTypeValidOptions()) } vmDiskPathCopy := vmDiskPath - vmDiskPath = RemoveClusterFromVDiskPath(vmDiskPath) + vmDiskPath = RemoveStorageClusterORFolderNameFromVDiskPath(vmDiskPath) attached, err := vm.IsDiskAttached(ctx, vmDiskPath) if err != nil { glog.Errorf("Error occurred while checking if disk is attached on VM: %q. vmDiskPath: %q, err: %+v", vm.InventoryPath, vmDiskPath, err) @@ -75,6 +76,20 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol return diskUUID, nil } + if volumeOptions.StoragePolicyName != "" { + pbmClient, err := NewPbmClient(ctx, vm.Client()) + if err != nil { + glog.Errorf("Error occurred while creating new pbmClient. err: %+v", err) + return "", err + } + + volumeOptions.StoragePolicyID, err = pbmClient.ProfileIDByName(ctx, volumeOptions.StoragePolicyName) + if err != nil { + glog.Errorf("Failed to get Profile ID by name: %s. err: %+v", volumeOptions.StoragePolicyName, err) + return "", err + } + } + dsObj, err := vm.Datacenter.GetDatastoreByPath(ctx, vmDiskPathCopy) if err != nil { glog.Errorf("Failed to get datastore from vmDiskPath: %q. err: %+v", vmDiskPath, err) @@ -139,7 +154,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol // DetachDisk detaches the disk specified by vmDiskPath func (vm *VirtualMachine) DetachDisk(ctx context.Context, vmDiskPath string) error { - vmDiskPath = RemoveClusterFromVDiskPath(vmDiskPath) + vmDiskPath = RemoveStorageClusterORFolderNameFromVDiskPath(vmDiskPath) device, err := vm.getVirtualDeviceByPath(ctx, vmDiskPath) if err != nil { glog.Errorf("Disk ID not found for VM: %q with diskPath: %q", vm.InventoryPath, vmDiskPath) @@ -186,7 +201,7 @@ func (vm *VirtualMachine) IsActive(ctx context.Context) (bool, error) { } // GetAllAccessibleDatastores gets the list of accessible Datastores for the given Virtual Machine -func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*Datastore, error) { +func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*DatastoreInfo, error) { host, err := vm.HostSystem(ctx) if err != nil { glog.Errorf("Failed to get host system for VM: %q. err: %+v", vm.InventoryPath, err) @@ -199,9 +214,28 @@ func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*Da glog.Errorf("Failed to retrieve datastores for host: %+v. err: %+v", host, err) return nil, err } - var dsObjList []*Datastore + var dsRefList []types.ManagedObjectReference for _, dsRef := range hostSystemMo.Datastore { - dsObjList = append(dsObjList, &Datastore{object.NewDatastore(vm.Client(), dsRef), vm.Datacenter}) + dsRefList = append(dsRefList, dsRef) + } + + var dsMoList []mo.Datastore + pc := property.DefaultCollector(vm.Client()) + properties := []string{DatastoreInfoProperty} + err = pc.Retrieve(ctx, dsRefList, properties, &dsMoList) + if err != nil { + glog.Errorf("Failed to get Datastore managed objects from datastore objects."+ + " dsObjList: %+v, properties: %+v, err: %v", dsRefList, properties, err) + return nil, err + } + glog.V(9).Infof("Result dsMoList: %+v", dsMoList) + var dsObjList []*DatastoreInfo + for _, dsMo := range dsMoList { + dsObjList = append(dsObjList, + &DatastoreInfo{ + &Datastore{object.NewDatastore(vm.Client(), dsMo.Reference()), + vm.Datacenter}, + dsMo.Info.GetDatastoreInfo()}) } return dsObjList, nil } diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index 31d2b64ec0e..77f80e23549 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net" + "os" "path" "path/filepath" "runtime" @@ -34,6 +35,9 @@ import ( "golang.org/x/net/context" "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib" @@ -47,7 +51,6 @@ const ( VolDir = "kubevols" RoundTripperDefaultCount = 3 DummyVMPrefixName = "vsphere-k8s" - VSANDatastoreType = "vsan" MacOuiVC = "00:50:56" MacOuiEsx = "00:0c:29" CleanUpDummyVMRoutineInterval = 5 @@ -58,25 +61,49 @@ const ( var cleanUpRoutineInitialized = false var datastoreFolderIDMap = make(map[string]map[string]string) -var clientLock sync.Mutex var cleanUpRoutineInitLock sync.Mutex var cleanUpDummyVMLock sync.RWMutex // VSphere is an implementation of cloud provider Interface for VSphere. type VSphere struct { - conn *vclib.VSphereConnection - cfg *VSphereConfig - // InstanceID of the server where this VSphere object is instantiated. - localInstanceID string + cfg *VSphereConfig + hostName string + // Maps the VSphere IP address to VSphereInstance + vsphereInstanceMap map[string]*VSphereInstance + // Responsible for managing discovery of k8s node, their location etc. + nodeManager *NodeManager } -// VSphereConfig information that is used by vSphere Cloud Provider to connect to VC +// Represents a vSphere instance where one or more kubernetes nodes are running. +type VSphereInstance struct { + conn *vclib.VSphereConnection + cfg *VirtualCenterConfig +} + +// Structure that represents Virtual Center configuration +type VirtualCenterConfig struct { + // vCenter username. + User string `gcfg:"user"` + // vCenter password in clear text. + Password string `gcfg:"password"` + // vCenter port. + VCenterPort string `gcfg:"port"` + // Datacenter in which VMs are located. + Datacenters string `gcfg:"datacenters"` + // Soap round tripper count (retries = RoundTripper - 1) + RoundTripperCount uint `gcfg:"soap-roundtrip-count"` +} + +// Structure that represents the content of vsphere.conf file. +// Users specify the configuration of one or more Virtual Centers in vsphere.conf where +// the Kubernetes master and worker nodes are running. type VSphereConfig struct { Global struct { // vCenter username. User string `gcfg:"user"` // vCenter password in clear text. Password string `gcfg:"password"` + // Deprecated. Use VirtualCenter to specify multiple vCenter Servers. // vCenter IP. VCenterIP string `gcfg:"server"` // vCenter port. @@ -84,23 +111,32 @@ type VSphereConfig struct { // True if vCenter uses self-signed cert. InsecureFlag bool `gcfg:"insecure-flag"` // Datacenter in which VMs are located. + // Deprecated. Use "datacenters" instead. Datacenter string `gcfg:"datacenter"` + // Datacenter in which VMs are located. + Datacenters string `gcfg:"datacenters"` // Datastore in which vmdks are stored. - Datastore string `gcfg:"datastore"` - // WorkingDir is path where VMs can be found. + // Deprecated. See Workspace.DefaultDatastore + DefaultDatastore string `gcfg:"datastore"` + // WorkingDir is path where VMs can be found. Also used to create dummy VMs. + // Deprecated. WorkingDir string `gcfg:"working-dir"` // Soap round tripper count (retries = RoundTripper - 1) RoundTripperCount uint `gcfg:"soap-roundtrip-count"` + // Deprecated as the virtual machines will be automatically discovered. // VMUUID is the VM Instance UUID of virtual machine which can be retrieved from instanceUuid // property in VmConfigInfo, or also set as vc.uuid in VMX file. // If not set, will be fetched from the machine via sysfs (requires root) VMUUID string `gcfg:"vm-uuid"` + // Deprecated as virtual machine will be automatically discovered. // VMName is the VM name of virtual machine // Combining the WorkingDir and VMName can form a unique InstanceID. // When vm-name is set, no username/password is required on worker nodes. VMName string `gcfg:"vm-name"` } + VirtualCenter map[string]*VirtualCenterConfig + Network struct { // PublicNetwork is name of the network the VMs are joined to. PublicNetwork string `gcfg:"public-network"` @@ -110,12 +146,21 @@ type VSphereConfig struct { // SCSIControllerType defines SCSI controller to be used. SCSIControllerType string `dcfg:"scsicontrollertype"` } + + // Endpoint used to create volumes + Workspace struct { + VCenterIP string `gcfg:"server"` + Datacenter string `gcfg:"datacenter"` + Folder string `gcfg:"folder"` + DefaultDatastore string `gcfg:"default-datastore"` + ResourcePoolPath string `gcfg:"resourcepool-path"` + } } type Volumes interface { // AttachDisk attaches given disk to given node. Current node // is used when nodeName is empty string. - AttachDisk(vmDiskPath string, storagePolicyID string, nodeName k8stypes.NodeName) (diskUUID string, err error) + AttachDisk(vmDiskPath string, storagePolicyName string, nodeName k8stypes.NodeName) (diskUUID string, err error) // DetachDisk detaches given disk to given node. Current node // is used when nodeName is empty string. @@ -152,19 +197,169 @@ func readConfig(config io.Reader) (VSphereConfig, error) { func init() { vclib.RegisterMetrics() cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { + // If vSphere.conf file is not present then it is worker node. + if config == nil { + return newWorkerNode() + } cfg, err := readConfig(config) if err != nil { return nil, err } - return newVSphere(cfg) + return newControllerNode(cfg) }) } // Initialize passes a Kubernetes clientBuilder interface to the cloud provider -func (vs *VSphere) Initialize(clientBuilder controller.ControllerClientBuilder) {} +func (vs *VSphere) Initialize(clientBuilder controller.ControllerClientBuilder) { + if vs.cfg == nil { + return + } -func newVSphere(cfg VSphereConfig) (*VSphere, error) { + // Only on controller node it is required to register listeners. + // Register callbacks for node updates + client := clientBuilder.ClientOrDie("vSphere-cloud-provider") + factory := informers.NewSharedInformerFactory(client, 5*time.Minute) + nodeInformer := factory.Core().V1().Nodes() + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: vs.NodeAdded, + DeleteFunc: vs.NodeDeleted, + }) + go nodeInformer.Informer().Run(wait.NeverStop) + glog.V(4).Infof("vSphere cloud provider initialized") +} + +// Creates new worker node interface and returns +func newWorkerNode() (*VSphere, error) { var err error + vs := VSphere{} + vs.hostName, err = os.Hostname() + if err != nil { + glog.Errorf("Failed to get hostname. err: %+v", err) + return nil, err + } + + return &vs, nil +} + +func populateVsphereInstanceMap(cfg *VSphereConfig) (map[string]*VSphereInstance, error) { + vsphereInstanceMap := make(map[string]*VSphereInstance) + + // Check if the vsphere.conf is in old format. In this + // format the cfg.VirtualCenter will be nil or empty. + if cfg.VirtualCenter == nil || len(cfg.VirtualCenter) == 0 { + glog.V(4).Infof("Config is not per virtual center and is in old format.") + if cfg.Global.User == "" { + glog.Error("Global.User is empty!") + return nil, errors.New("Global.User is empty!") + } + if cfg.Global.Password == "" { + glog.Error("Global.Password is empty!") + return nil, errors.New("Global.Password is empty!") + } + if cfg.Global.WorkingDir == "" { + glog.Error("Global.WorkingDir is empty!") + return nil, errors.New("Global.WorkingDir is empty!") + } + if cfg.Global.VCenterIP == "" { + glog.Error("Global.VCenterIP is empty!") + return nil, errors.New("Global.VCenterIP is empty!") + } + if cfg.Global.Datacenter == "" { + glog.Error("Global.Datacenter is empty!") + return nil, errors.New("Global.Datacenter is empty!") + } + cfg.Workspace.VCenterIP = cfg.Global.VCenterIP + cfg.Workspace.Datacenter = cfg.Global.Datacenter + cfg.Workspace.Folder = cfg.Global.WorkingDir + cfg.Workspace.DefaultDatastore = cfg.Global.DefaultDatastore + + vcConfig := VirtualCenterConfig{ + User: cfg.Global.User, + Password: cfg.Global.Password, + VCenterPort: cfg.Global.VCenterPort, + Datacenters: cfg.Global.Datacenter, + RoundTripperCount: cfg.Global.RoundTripperCount, + } + + vSphereConn := vclib.VSphereConnection{ + Username: vcConfig.User, + Password: vcConfig.Password, + Hostname: cfg.Global.VCenterIP, + Insecure: cfg.Global.InsecureFlag, + RoundTripperCount: vcConfig.RoundTripperCount, + Port: vcConfig.VCenterPort, + } + vsphereIns := VSphereInstance{ + conn: &vSphereConn, + cfg: &vcConfig, + } + vsphereInstanceMap[cfg.Global.VCenterIP] = &vsphereIns + } else { + if cfg.Workspace.VCenterIP == "" || cfg.Workspace.Folder == "" || cfg.Workspace.Datacenter == "" { + msg := fmt.Sprintf("All fields in workspace are mandatory."+ + " vsphere.conf does not have the workspace specified correctly. cfg.Workspace: %+v", cfg.Workspace) + glog.Error(msg) + return nil, errors.New(msg) + } + for vcServer, vcConfig := range cfg.VirtualCenter { + glog.V(4).Infof("Initializing vc server %s", vcServer) + if vcServer == "" { + glog.Error("vsphere.conf does not have the VirtualCenter IP address specified") + return nil, errors.New("vsphere.conf does not have the VirtualCenter IP address specified") + } + if vcConfig.User == "" { + vcConfig.User = cfg.Global.User + } + if vcConfig.Password == "" { + vcConfig.Password = cfg.Global.Password + } + if vcConfig.User == "" { + msg := fmt.Sprintf("vcConfig.User is empty for vc %s!", vcServer) + glog.Error(msg) + return nil, errors.New(msg) + } + if vcConfig.Password == "" { + msg := fmt.Sprintf("vcConfig.Password is empty for vc %s!", vcServer) + glog.Error(msg) + return nil, errors.New(msg) + } + if vcConfig.VCenterPort == "" { + vcConfig.VCenterPort = cfg.Global.VCenterPort + } + if vcConfig.Datacenters == "" { + if cfg.Global.Datacenters != "" { + vcConfig.Datacenters = cfg.Global.Datacenters + } else { + // cfg.Global.Datacenter is deprecated, so giving it the last preference. + vcConfig.Datacenters = cfg.Global.Datacenter + } + } + if vcConfig.RoundTripperCount == 0 { + vcConfig.RoundTripperCount = cfg.Global.RoundTripperCount + } + + vSphereConn := vclib.VSphereConnection{ + Username: vcConfig.User, + Password: vcConfig.Password, + Hostname: vcServer, + Insecure: cfg.Global.InsecureFlag, + RoundTripperCount: vcConfig.RoundTripperCount, + Port: vcConfig.VCenterPort, + } + vsphereIns := VSphereInstance{ + conn: &vSphereConn, + cfg: vcConfig, + } + vsphereInstanceMap[vcServer] = &vsphereIns + } + } + return vsphereInstanceMap, nil +} + +// Creates new Contreoller node interface and returns +func newControllerNode(cfg VSphereConfig) (*VSphere, error) { + var err error + if cfg.Disk.SCSIControllerType == "" { cfg.Disk.SCSIControllerType = vclib.PVSCSIControllerType } else if !vclib.CheckControllerSupported(cfg.Disk.SCSIControllerType) { @@ -188,56 +383,37 @@ func newVSphere(cfg VSphereConfig) (*VSphere, error) { return nil, err } } - vSphereConn := vclib.VSphereConnection{ - Username: cfg.Global.User, - Password: cfg.Global.Password, - Hostname: cfg.Global.VCenterIP, - Insecure: cfg.Global.InsecureFlag, - RoundTripperCount: cfg.Global.RoundTripperCount, - Port: cfg.Global.VCenterPort, + vsphereInstanceMap, err := populateVsphereInstanceMap(&cfg) + if err != nil { + return nil, err } - var instanceID string - if cfg.Global.VMName == "" { - // if VMName is not set in the cloud config file, each nodes (including worker nodes) need credentials to obtain VMName from vCenter - glog.V(4).Infof("Cannot find VMName from cloud config file, start obtaining it from vCenter") - // Create context - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - err = vSphereConn.Connect(ctx) - if err != nil { - glog.Errorf("Failed to connect to vSphere") - return nil, err - } - dc, err := vclib.GetDatacenter(ctx, &vSphereConn, cfg.Global.Datacenter) - if err != nil { - return nil, err - } - vm, err := dc.GetVMByUUID(ctx, cfg.Global.VMUUID) - if err != nil { - return nil, err - } - vmName, err := vm.ObjectName(ctx) - if err != nil { - return nil, err - } - instanceID = vmName - } else { - instanceID = cfg.Global.VMName - } vs := VSphere{ - conn: &vSphereConn, - cfg: &cfg, - localInstanceID: instanceID, + vsphereInstanceMap: vsphereInstanceMap, + nodeManager: &NodeManager{ + vsphereInstanceMap: vsphereInstanceMap, + nodeInfoMap: make(map[string]*NodeInfo), + registeredNodes: make(map[string]*v1.Node), + }, + cfg: &cfg, + } + + vs.hostName, err = os.Hostname() + if err != nil { + glog.Errorf("Failed to get hostname. err: %+v", err) + return nil, err } runtime.SetFinalizer(&vs, logout) return &vs, nil } func logout(vs *VSphere) { - if vs.conn.GoVmomiClient != nil { - vs.conn.GoVmomiClient.Logout(context.TODO()) + for _, vsphereIns := range vs.vsphereInstanceMap { + if vsphereIns.conn.GoVmomiClient != nil { + vsphereIns.conn.GoVmomiClient.Logout(context.TODO()) + } } + } // Instances returns an implementation of Instances for vSphere. @@ -284,43 +460,74 @@ func getLocalIP() ([]v1.NodeAddress, error) { return addrs, nil } +func (vs *VSphere) getVSphereInstance(nodeName k8stypes.NodeName) (*VSphereInstance, error) { + vsphereIns, err := vs.nodeManager.GetVSphereInstance(nodeName) + if err != nil { + glog.Errorf("Cannot find node %q in cache. Node not found!!!", nodeName) + return nil, err + } + return &vsphereIns, nil +} + +func (vs *VSphere) getVSphereInstanceForServer(vcServer string, ctx context.Context) (*VSphereInstance, error) { + vsphereIns, ok := vs.vsphereInstanceMap[vcServer] + if !ok { + glog.Errorf("cannot find vcServer %q in cache. VC not found!!!", vcServer) + return nil, errors.New(fmt.Sprintf("Cannot find node %q in vsphere configuration map", vcServer)) + } + // Ensure client is logged in and session is valid + err := vsphereIns.conn.Connect(ctx) + if err != nil { + glog.Errorf("failed connecting to vcServer %q with error %+v", vcServer, err) + return nil, err + } + + return vsphereIns, nil +} + // Get the VM Managed Object instance by from the node -func (vs *VSphere) getVMByName(ctx context.Context, nodeName k8stypes.NodeName) (*vclib.VirtualMachine, error) { - dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter) +func (vs *VSphere) getVMFromNodeName(ctx context.Context, nodeName k8stypes.NodeName) (*vclib.VirtualMachine, error) { + nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName) if err != nil { return nil, err } - vmPath := vs.cfg.Global.WorkingDir + "/" + nodeNameToVMName(nodeName) - vm, err := dc.GetVMByPath(ctx, vmPath) - if err != nil { - return nil, err - } - return vm, nil + return nodeInfo.vm, nil } // NodeAddresses is an implementation of Instances.NodeAddresses. func (vs *VSphere) NodeAddresses(nodeName k8stypes.NodeName) ([]v1.NodeAddress, error) { // Get local IP addresses if node is local node - if vs.localInstanceID == nodeNameToVMName(nodeName) { + if vs.hostName == convertToString(nodeName) { return getLocalIP() } + + if vs.cfg == nil { + return nil, cloudprovider.InstanceNotFound + } + + // Below logic can be executed only on master as VC details are present. addrs := []v1.NodeAddress{} // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) + vsi, err := vs.getVSphereInstance(nodeName) if err != nil { return nil, err } - vm, err := vs.getVMByName(ctx, nodeName) + // Ensure client is logged in and session is valid + err = vsi.conn.Connect(ctx) if err != nil { - glog.Errorf("Failed to get VM object for node: %q. err: +%v", nodeNameToVMName(nodeName), err) + return nil, err + } + + vm, err := vs.getVMFromNodeName(ctx, nodeName) + if err != nil { + glog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err) return nil, err } vmMoList, err := vm.Datacenter.GetVMMoList(ctx, []*vclib.VirtualMachine{vm}, []string{"guest.net"}) if err != nil { - glog.Errorf("Failed to get VM Managed object with property guest.net for node: %q. err: +%v", nodeNameToVMName(nodeName), err) + glog.Errorf("Failed to get VM Managed object with property guest.net for node: %q. err: +%v", convertToString(nodeName), err) return nil, err } // retrieve VM's ip(s) @@ -348,8 +555,7 @@ func (vs *VSphere) NodeAddresses(nodeName k8stypes.NodeName) ([]v1.NodeAddress, // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here func (vs *VSphere) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) { - vmName := path.Base(providerID) - return vs.NodeAddresses(vmNameToNodeName(vmName)) + return vs.NodeAddresses(convertToK8sType(providerID)) } // AddSSHKeyToAllInstances add SSH key to all instances @@ -359,16 +565,14 @@ func (vs *VSphere) AddSSHKeyToAllInstances(user string, keyData []byte) error { // CurrentNodeName gives the current node name func (vs *VSphere) CurrentNodeName(hostname string) (k8stypes.NodeName, error) { - return vmNameToNodeName(vs.localInstanceID), nil + return convertToK8sType(vs.hostName), nil } -// nodeNameToVMName maps a NodeName to the vmware infrastructure name -func nodeNameToVMName(nodeName k8stypes.NodeName) string { +func convertToString(nodeName k8stypes.NodeName) string { return string(nodeName) } -// nodeNameToVMName maps a vmware infrastructure name to a NodeName -func vmNameToNodeName(vmName string) k8stypes.NodeName { +func convertToK8sType(vmName string) k8stypes.NodeName { return k8stypes.NodeName(vmName) } @@ -380,68 +584,73 @@ func (vs *VSphere) ExternalID(nodeName k8stypes.NodeName) (string, error) { // InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running. // If false is returned with no error, the instance will be immediately deleted by the cloud controller manager. func (vs *VSphere) InstanceExistsByProviderID(providerID string) (bool, error) { - vmName := path.Base(providerID) - nodeName := vmNameToNodeName(vmName) - // Create context - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) - if err != nil { - return false, err - } - vm, err := vs.getVMByName(ctx, nodeName) - if err != nil { - if vclib.IsNotFound(err) { - return false, nil - } - glog.Errorf("Failed to get VM object for node: %q. err: +%v", nodeNameToVMName(nodeName), err) - return false, err + _, err := vs.InstanceID(convertToK8sType(providerID)) + if err == nil { + return true, nil } - isActive, err := vm.IsActive(ctx) - if err != nil { - glog.Errorf("Failed to check whether node %q is active. err: %+v.", nodeNameToVMName(nodeName), err) - return false, err - } - if !isActive { - return false, nil - } - - return true, nil + return false, err } // InstanceID returns the cloud provider ID of the node with the specified Name. func (vs *VSphere) InstanceID(nodeName k8stypes.NodeName) (string, error) { - if vs.localInstanceID == nodeNameToVMName(nodeName) { - return vs.cfg.Global.WorkingDir + "/" + vs.localInstanceID, nil - } - // Create context - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) - if err != nil { - return "", err - } - vm, err := vs.getVMByName(ctx, nodeName) - if err != nil { - if vclib.IsNotFound(err) { - return "", cloudprovider.InstanceNotFound + + instanceIDInternal := func() (string, error) { + if vs.hostName == convertToString(nodeName) { + return vs.hostName, nil } - glog.Errorf("Failed to get VM object for node: %q. err: +%v", nodeNameToVMName(nodeName), err) - return "", err + + // Below logic can be performed only on master node where VC details are preset. + if vs.cfg == nil { + return "", fmt.Errorf("The current node can't detremine InstanceID for %q", convertToString(nodeName)) + } + + // Create context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + vsi, err := vs.getVSphereInstance(nodeName) + if err != nil { + return "", err + } + // Ensure client is logged in and session is valid + err = vsi.conn.Connect(ctx) + if err != nil { + return "", err + } + vm, err := vs.getVMFromNodeName(ctx, nodeName) + if err != nil { + if err == vclib.ErrNoVMFound { + return "", cloudprovider.InstanceNotFound + } + glog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err) + return "", err + } + isActive, err := vm.IsActive(ctx) + if err != nil { + glog.Errorf("Failed to check whether node %q is active. err: %+v.", convertToString(nodeName), err) + return "", err + } + if isActive { + return convertToString(nodeName), nil + } + glog.Warningf("The VM: %s is not in %s state", convertToString(nodeName), vclib.ActivePowerState) + return "", cloudprovider.InstanceNotFound } - isActive, err := vm.IsActive(ctx) + + instanceID, err := instanceIDInternal() if err != nil { - glog.Errorf("Failed to check whether node %q is active. err: %+v.", nodeNameToVMName(nodeName), err) - return "", err + isManagedObjectNotFoundError, err := vs.retry(nodeName, err) + if isManagedObjectNotFoundError { + if err == nil { + glog.V(4).Infof("InstanceID: Found node %q", convertToString(nodeName)) + instanceID, err = instanceIDInternal() + } else if err == vclib.ErrNoVMFound { + return "", cloudprovider.InstanceNotFound + } + } } - if isActive { - return "/" + vm.InventoryPath, nil - } - glog.Warningf("The VM: %s is not in %s state", nodeNameToVMName(nodeName), vclib.ActivePowerState) - return "", cloudprovider.InstanceNotFound + + return instanceID, err } // InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID @@ -486,72 +695,111 @@ func (vs *VSphere) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []st } // AttachDisk attaches given virtual disk volume to the compute running kubelet. -func (vs *VSphere) AttachDisk(vmDiskPath string, storagePolicyID string, nodeName k8stypes.NodeName) (diskUUID string, err error) { - attachDiskInternal := func(vmDiskPath string, storagePolicyID string, nodeName k8stypes.NodeName) (diskUUID string, err error) { +func (vs *VSphere) AttachDisk(vmDiskPath string, storagePolicyName string, nodeName k8stypes.NodeName) (diskUUID string, err error) { + attachDiskInternal := func(vmDiskPath string, storagePolicyName string, nodeName k8stypes.NodeName) (diskUUID string, err error) { if nodeName == "" { - nodeName = vmNameToNodeName(vs.localInstanceID) + nodeName = convertToK8sType(vs.hostName) } // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() + vsi, err := vs.getVSphereInstance(nodeName) + if err != nil { + return "", err + } // Ensure client is logged in and session is valid - err = vs.conn.Connect(ctx) + err = vsi.conn.Connect(ctx) if err != nil { return "", err } - vm, err := vs.getVMByName(ctx, nodeName) + + vm, err := vs.getVMFromNodeName(ctx, nodeName) if err != nil { - glog.Errorf("Failed to get VM object for node: %q. err: +%v", nodeNameToVMName(nodeName), err) + glog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err) return "", err } - diskUUID, err = vm.AttachDisk(ctx, vmDiskPath, &vclib.VolumeOptions{SCSIControllerType: vclib.PVSCSIControllerType, StoragePolicyID: storagePolicyID}) + + diskUUID, err = vm.AttachDisk(ctx, vmDiskPath, &vclib.VolumeOptions{SCSIControllerType: vclib.PVSCSIControllerType, StoragePolicyName: storagePolicyName}) if err != nil { - glog.Errorf("Failed to attach disk: %s for node: %s. err: +%v", vmDiskPath, nodeNameToVMName(nodeName), err) + glog.Errorf("Failed to attach disk: %s for node: %s. err: +%v", vmDiskPath, convertToString(nodeName), err) return "", err } return diskUUID, nil } requestTime := time.Now() - diskUUID, err = attachDiskInternal(vmDiskPath, storagePolicyID, nodeName) + diskUUID, err = attachDiskInternal(vmDiskPath, storagePolicyName, nodeName) + if err != nil { + isManagedObjectNotFoundError, err := vs.retry(nodeName, err) + if isManagedObjectNotFoundError { + if err == nil { + glog.V(4).Infof("AttachDisk: Found node %q", convertToString(nodeName)) + diskUUID, err = attachDiskInternal(vmDiskPath, storagePolicyName, nodeName) + } + } + } vclib.RecordvSphereMetric(vclib.OperationAttachVolume, requestTime, err) return diskUUID, err } +func (vs *VSphere) retry(nodeName k8stypes.NodeName, err error) (bool, error) { + isManagedObjectNotFoundError := false + if err != nil { + if vclib.IsManagedObjectNotFoundError(err) { + isManagedObjectNotFoundError = true + glog.V(4).Infof("error %q ManagedObjectNotFound for node %q", err, convertToString(nodeName)) + err = vs.nodeManager.RediscoverNode(nodeName) + } + } + return isManagedObjectNotFoundError, err +} + // DetachDisk detaches given virtual disk volume from the compute running kubelet. func (vs *VSphere) DetachDisk(volPath string, nodeName k8stypes.NodeName) error { detachDiskInternal := func(volPath string, nodeName k8stypes.NodeName) error { if nodeName == "" { - nodeName = vmNameToNodeName(vs.localInstanceID) + nodeName = convertToK8sType(vs.hostName) } // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) + vsi, err := vs.getVSphereInstance(nodeName) if err != nil { return err } - vm, err := vs.getVMByName(ctx, nodeName) + // Ensure client is logged in and session is valid + err = vsi.conn.Connect(ctx) + if err != nil { + return err + } + vm, err := vs.getVMFromNodeName(ctx, nodeName) if err != nil { // If node doesn't exist, disk is already detached from node. - if vclib.IsNotFound(err) { - glog.Infof("Node %q does not exist, disk %s is already detached from node.", nodeNameToVMName(nodeName), volPath) + if err == vclib.ErrNoVMFound { + glog.Infof("Node %q does not exist, disk %s is already detached from node.", convertToString(nodeName), volPath) return nil } - glog.Errorf("Failed to get VM object for node: %q. err: +%v", nodeNameToVMName(nodeName), err) + glog.Errorf("Failed to get VM object for node: %q. err: +%v", convertToString(nodeName), err) return err } err = vm.DetachDisk(ctx, volPath) if err != nil { - glog.Errorf("Failed to detach disk: %s for node: %s. err: +%v", volPath, nodeNameToVMName(nodeName), err) + glog.Errorf("Failed to detach disk: %s for node: %s. err: +%v", volPath, convertToString(nodeName), err) return err } return nil } requestTime := time.Now() err := detachDiskInternal(volPath, nodeName) - vclib.RecordvSphereMetric(vclib.OperationDetachVolume, requestTime, nil) + if err != nil { + isManagedObjectNotFoundError, err := vs.retry(nodeName, err) + if isManagedObjectNotFoundError { + if err == nil { + err = detachDiskInternal(volPath, nodeName) + } + } + } + vclib.RecordvSphereMetric(vclib.OperationDetachVolume, requestTime, err) return err } @@ -560,22 +808,26 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b diskIsAttachedInternal := func(volPath string, nodeName k8stypes.NodeName) (bool, error) { var vSphereInstance string if nodeName == "" { - vSphereInstance = vs.localInstanceID - nodeName = vmNameToNodeName(vSphereInstance) + vSphereInstance = vs.hostName + nodeName = convertToK8sType(vSphereInstance) } else { - vSphereInstance = nodeNameToVMName(nodeName) + vSphereInstance = convertToString(nodeName) } // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) + vsi, err := vs.getVSphereInstance(nodeName) if err != nil { return false, err } - vm, err := vs.getVMByName(ctx, nodeName) + // Ensure client is logged in and session is valid + err = vsi.conn.Connect(ctx) if err != nil { - if vclib.IsNotFound(err) { + return false, err + } + vm, err := vs.getVMFromNodeName(ctx, nodeName) + if err != nil { + if err == vclib.ErrNoVMFound { glog.Warningf("Node %q does not exist, vsphere CP will assume disk %v is not attached to it.", nodeName, volPath) // make the disk as detached and return false without error. return false, nil @@ -583,7 +835,7 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b glog.Errorf("Failed to get VM object for node: %q. err: +%v", vSphereInstance, err) return false, err } - volPath = vclib.RemoveClusterFromVDiskPath(volPath) + volPath = vclib.RemoveStorageClusterORFolderNameFromVDiskPath(volPath) attached, err := vm.IsDiskAttached(ctx, volPath) if err != nil { glog.Errorf("DiskIsAttached failed to determine whether disk %q is still attached on node %q", @@ -594,57 +846,144 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b } requestTime := time.Now() isAttached, err := diskIsAttachedInternal(volPath, nodeName) + if err != nil { + isManagedObjectNotFoundError, err := vs.retry(nodeName, err) + if isManagedObjectNotFoundError { + if err == vclib.ErrNoVMFound { + isAttached, err = false, nil + } else if err == nil { + isAttached, err = diskIsAttachedInternal(volPath, nodeName) + } + } + } vclib.RecordvSphereMetric(vclib.OperationDiskIsAttached, requestTime, err) return isAttached, err } // DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin. +// 1. Converts volPaths into canonical form so that it can be compared with the VM device path. +// 2. Segregates nodes by vCenter and Datacenter they are present in. This reduces calls to VC. +// 3. Creates go routines per VC-DC to find whether disks are attached to the nodes. +// 4. If the some of the VMs are not found or migrated then they are added to a list. +// 5. After successful execution of goroutines, +// 5a. If there are any VMs which needs to be retried, they are rediscovered and the whole operation is initiated again for only rediscovered VMs. +// 5b. If VMs are removed from vSphere inventory they are ignored. func (vs *VSphere) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) { disksAreAttachedInternal := func(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) { - attached := make(map[k8stypes.NodeName]map[string]bool) - if len(nodeVolumes) == 0 { - return attached, nil + + // disksAreAttach checks whether disks are attached to the nodes. + // Returns nodes that need to be retried if retry is true + // Segregates nodes per VC and DC + // Creates go routines per VC-DC to find whether disks are attached to the nodes. + disksAreAttach := func(ctx context.Context, nodeVolumes map[k8stypes.NodeName][]string, attached map[string]map[string]bool, retry bool) ([]k8stypes.NodeName, error) { + + var wg sync.WaitGroup + var localAttachedMaps []map[string]map[string]bool + var nodesToRetry []k8stypes.NodeName + var globalErr error + globalErr = nil + globalErrMutex := &sync.Mutex{} + nodesToRetryMutex := &sync.Mutex{} + + // Segregate nodes according to VC-DC + dcNodes := make(map[string][]k8stypes.NodeName) + for nodeName := range nodeVolumes { + nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + glog.Errorf("Failed to get node info: %+v. err: %+v", nodeInfo.vm, err) + return nodesToRetry, err + } + VC_DC := nodeInfo.vcServer + nodeInfo.dataCenter.String() + dcNodes[VC_DC] = append(dcNodes[VC_DC], nodeName) + } + + for _, nodes := range dcNodes { + localAttachedMap := make(map[string]map[string]bool) + localAttachedMaps = append(localAttachedMaps, localAttachedMap) + // Start go routines per VC-DC to check disks are attached + go func() { + nodesToRetryLocal, err := vs.checkDiskAttached(ctx, nodes, nodeVolumes, localAttachedMap, retry) + if err != nil { + if !vclib.IsManagedObjectNotFoundError(err) { + globalErrMutex.Lock() + globalErr = err + globalErrMutex.Unlock() + glog.Errorf("Failed to check disk attached for nodes: %+v. err: %+v", nodes, err) + } + } + nodesToRetryMutex.Lock() + nodesToRetry = append(nodesToRetry, nodesToRetryLocal...) + nodesToRetryMutex.Unlock() + wg.Done() + }() + wg.Add(1) + } + wg.Wait() + if globalErr != nil { + return nodesToRetry, globalErr + } + for _, localAttachedMap := range localAttachedMaps { + for key, value := range localAttachedMap { + attached[key] = value + } + } + + return nodesToRetry, nil } + + glog.V(4).Info("Starting DisksAreAttached API for vSphere with nodeVolumes: %+v", nodeVolumes) // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) + + disksAttached := make(map[k8stypes.NodeName]map[string]bool) + if len(nodeVolumes) == 0 { + return disksAttached, nil + } + + // Convert VolPaths into canonical form so that it can be compared with the VM device path. + vmVolumes, err := vs.convertVolPathsToDevicePaths(ctx, nodeVolumes) if err != nil { + glog.Errorf("Failed to convert volPaths to devicePaths: %+v. err: %+v", nodeVolumes, err) return nil, err } - dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter) + attached := make(map[string]map[string]bool) + nodesToRetry, err := disksAreAttach(ctx, vmVolumes, attached, false) if err != nil { return nil, err } - vmVolumes := make(map[string][]string) - for nodeName, volPaths := range nodeVolumes { - for i, volPath := range volPaths { - volPath = vclib.RemoveClusterFromVDiskPath(volPath) - // Get the canonical volume path for volPath. - canonicalVolumePath, err := getcanonicalVolumePath(ctx, dc, volPath) + if len(nodesToRetry) != 0 { + // Rediscover nodes which are need to be retried + remainingNodesVolumes := make(map[k8stypes.NodeName][]string) + for _, nodeName := range nodesToRetry { + err = vs.nodeManager.RediscoverNode(nodeName) if err != nil { - glog.Errorf("Failed to get canonical vsphere volume path for volume: %s. err: %+v", volPath, err) + if err == vclib.ErrNoVMFound { + glog.V(4).Infof("node %s not found. err: %+v", nodeName, err) + continue + } + glog.Errorf("Failed to rediscover node %s. err: %+v", nodeName, err) return nil, err } - // Check if the volume path contains .vmdk extension. If not, add the extension and update the nodeVolumes Map - if len(canonicalVolumePath) > 0 && filepath.Ext(canonicalVolumePath) != ".vmdk" { - canonicalVolumePath += ".vmdk" - } - volPaths[i] = canonicalVolumePath + remainingNodesVolumes[nodeName] = nodeVolumes[nodeName] + } + + // If some remaining nodes are still registered + if len(remainingNodesVolumes) != 0 { + nodesToRetry, err = disksAreAttach(ctx, remainingNodesVolumes, attached, true) + if err != nil || len(nodesToRetry) != 0 { + glog.Errorf("Failed to retry disksAreAttach for nodes %+v. err: %+v", remainingNodesVolumes, err) + return nil, err + } + } + + for nodeName, volPaths := range attached { + disksAttached[convertToK8sType(nodeName)] = volPaths } - vmVolumes[nodeNameToVMName(nodeName)] = volPaths } - // Check if the disks are attached to their respective nodes - disksAttachedList, err := dc.CheckDisksAttached(ctx, vmVolumes) - if err != nil { - return nil, err - } - for vmName, volPaths := range disksAttachedList { - attached[vmNameToNodeName(vmName)] = volPaths - } - return attached, nil + glog.V(4).Infof("DisksAreAttach successfully executed. result: %+v", attached) + return disksAttached, nil } requestTime := time.Now() attached, err := disksAreAttachedInternal(nodeVolumes) @@ -660,9 +999,9 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo glog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions) createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) { var datastore string - // Default datastore is the datastore in the vSphere config file that is used to initialize vSphere cloud provider. + // If datastore not specified, then use default datastore if volumeOptions.Datastore == "" { - datastore = vs.cfg.Global.Datastore + datastore = vs.cfg.Workspace.DefaultDatastore } else { datastore = volumeOptions.Datastore } @@ -670,12 +1009,11 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Ensure client is logged in and session is valid - err = vs.conn.Connect(ctx) + vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx) if err != nil { return "", err } - dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter) + dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter) if err != nil { return "", err } @@ -693,18 +1031,37 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo cleanUpRoutineInitialized = true } cleanUpRoutineInitLock.Unlock() - vmOptions, err = vs.setVMOptions(ctx, dc) + vmOptions, err = vs.setVMOptions(ctx, dc, vs.cfg.Workspace.ResourcePoolPath) if err != nil { glog.Errorf("Failed to set VM options requires to create a vsphere volume. err: %+v", err) return "", err } } if volumeOptions.StoragePolicyName != "" && volumeOptions.Datastore == "" { - datastore, err = getPbmCompatibleDatastore(ctx, dc.Client(), volumeOptions.StoragePolicyName, vmOptions.VMFolder) + datastore, err = getPbmCompatibleDatastore(ctx, dc, volumeOptions.StoragePolicyName, vs.nodeManager) if err != nil { glog.Errorf("Failed to get pbm compatible datastore with storagePolicy: %s. err: %+v", volumeOptions.StoragePolicyName, err) return "", err } + } else { + // Since no storage policy is specified but datastore is specified, check + // if the given datastore is a shared datastore across all node VMs. + sharedDsList, err := getSharedDatastoresInK8SCluster(ctx, dc, vs.nodeManager) + if err != nil { + glog.Errorf("Failed to get shared datastore: %+v", err) + return "", err + } + found := false + for _, sharedDs := range sharedDsList { + if datastore == sharedDs.Info.Name { + found = true + break + } + } + if !found { + msg := fmt.Sprintf("The specified datastore %s is not a shared datastore across node VMs", datastore) + return "", errors.New(msg) + } } ds, err := dc.GetDatastoreByName(ctx, datastore) if err != nil { @@ -743,7 +1100,7 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVo requestTime := time.Now() canonicalVolumePath, err = createVolumeInternal(volumeOptions) vclib.RecordCreateVolumeMetric(volumeOptions, requestTime, err) - glog.V(1).Infof("The canonical volume path for the newly created vSphere volume is %q", canonicalVolumePath) + glog.V(4).Infof("The canonical volume path for the newly created vSphere volume is %q", canonicalVolumePath) return canonicalVolumePath, err } @@ -754,16 +1111,11 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error { // Create context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) + vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx) if err != nil { return err } - dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter) - if err != nil { - return err - } - ds, err := dc.GetDatastoreByName(ctx, vs.cfg.Global.Datastore) + dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter) if err != nil { return err } @@ -772,7 +1124,7 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error { VolumeOptions: &vclib.VolumeOptions{}, VMOptions: &vclib.VMOptions{}, } - err = disk.Delete(ctx, ds) + err = disk.Delete(ctx, dc) if err != nil { glog.Errorf("Failed to delete vsphere volume with vmDiskPath: %s. err: %+v", vmDiskPath, err) } @@ -788,3 +1140,27 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error { func (vs *VSphere) HasClusterID() bool { return true } + +// Notification handler when node is added into k8s cluster. +func (vs *VSphere) NodeAdded(obj interface{}) { + node, ok := obj.(*v1.Node) + if node == nil || !ok { + glog.Warningf("NodeAdded: unrecognized object %+v", obj) + return + } + + glog.V(4).Infof("Node added: %+v", node) + vs.nodeManager.RegisterNode(node) +} + +// Notification handler when node is removed from k8s cluster. +func (vs *VSphere) NodeDeleted(obj interface{}) { + node, ok := obj.(*v1.Node) + if node == nil || !ok { + glog.Warningf("NodeDeleted: unrecognized object %+v", obj) + return + } + + glog.V(4).Infof("Node deleted: %+v", node) + vs.nodeManager.UnRegisterNode(node) +} diff --git a/pkg/cloudprovider/providers/vsphere/vsphere_test.go b/pkg/cloudprovider/providers/vsphere/vsphere_test.go index b8b54e99aef..b49224124e9 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere_test.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere_test.go @@ -39,7 +39,7 @@ func configFromEnv() (cfg VSphereConfig, ok bool) { cfg.Global.Password = os.Getenv("VSPHERE_PASSWORD") cfg.Global.Datacenter = os.Getenv("VSPHERE_DATACENTER") cfg.Network.PublicNetwork = os.Getenv("VSPHERE_PUBLIC_NETWORK") - cfg.Global.Datastore = os.Getenv("VSPHERE_DATASTORE") + cfg.Global.DefaultDatastore = os.Getenv("VSPHERE_DATASTORE") cfg.Disk.SCSIControllerType = os.Getenv("VSPHERE_SCSICONTROLLER_TYPE") cfg.Global.WorkingDir = os.Getenv("VSPHERE_WORKING_DIR") cfg.Global.VMName = os.Getenv("VSPHERE_VM_NAME") @@ -103,7 +103,7 @@ func TestNewVSphere(t *testing.T) { t.Skipf("No config found in environment") } - _, err := newVSphere(cfg) + _, err := newControllerNode(cfg) if err != nil { t.Fatalf("Failed to construct/authenticate vSphere: %s", err) } @@ -116,7 +116,7 @@ func TestVSphereLogin(t *testing.T) { } // Create vSphere configuration object - vs, err := newVSphere(cfg) + vs, err := newControllerNode(cfg) if err != nil { t.Fatalf("Failed to construct/authenticate vSphere: %s", err) } @@ -126,11 +126,16 @@ func TestVSphereLogin(t *testing.T) { defer cancel() // Create vSphere client - err = vs.conn.Connect(ctx) + var vcInstance *VSphereInstance + if vcInstance, ok = vs.vsphereInstanceMap[cfg.Global.VCenterIP]; !ok { + t.Fatalf("Couldn't get vSphere instance: %s", cfg.Global.VCenterIP) + } + + err = vcInstance.conn.Connect(ctx) if err != nil { t.Errorf("Failed to connect to vSphere: %s", err) } - defer vs.conn.GoVmomiClient.Logout(ctx) + defer vcInstance.conn.GoVmomiClient.Logout(ctx) } func TestZones(t *testing.T) { @@ -154,7 +159,7 @@ func TestInstances(t *testing.T) { t.Skipf("No config found in environment") } - vs, err := newVSphere(cfg) + vs, err := newControllerNode(cfg) if err != nil { t.Fatalf("Failed to construct/authenticate vSphere: %s", err) } @@ -213,7 +218,7 @@ func TestVolumes(t *testing.T) { t.Skipf("No config found in environment") } - vs, err := newVSphere(cfg) + vs, err := newControllerNode(cfg) if err != nil { t.Fatalf("Failed to construct/authenticate vSphere: %s", err) } diff --git a/pkg/cloudprovider/providers/vsphere/vsphere_util.go b/pkg/cloudprovider/providers/vsphere/vsphere_util.go index 3fbe2b621f1..efedb062139 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere_util.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere_util.go @@ -28,14 +28,16 @@ import ( "github.com/golang/glog" "github.com/vmware/govmomi" - "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/vim25" - "github.com/vmware/govmomi/vim25/mo" "fmt" + "github.com/vmware/govmomi/vim25/mo" + "k8s.io/api/core/v1" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers" + "path/filepath" ) const ( @@ -55,10 +57,28 @@ func GetVSphere() (*VSphere, error) { return nil, err } vSphereConn.GoVmomiClient = client + vsphereIns := &VSphereInstance{ + conn: vSphereConn, + cfg: &VirtualCenterConfig{ + User: cfg.Global.User, + Password: cfg.Global.Password, + VCenterPort: cfg.Global.VCenterPort, + Datacenters: cfg.Global.Datacenters, + RoundTripperCount: cfg.Global.RoundTripperCount, + }, + } + vsphereInsMap := make(map[string]*VSphereInstance) + vsphereInsMap[cfg.Global.VCenterIP] = vsphereIns + // TODO: Initialize nodeManager and set it in VSphere. vs := &VSphere{ - conn: vSphereConn, - cfg: cfg, - localInstanceID: "", + vsphereInstanceMap: vsphereInsMap, + hostName: "", + cfg: cfg, + nodeManager: &NodeManager{ + vsphereInstanceMap: vsphereInsMap, + nodeInfoMap: make(map[string]*NodeInfo), + registeredNodes: make(map[string]*v1.Node), + }, } runtime.SetFinalizer(vs, logout) return vs, nil @@ -70,14 +90,18 @@ func getVSphereConfig() *VSphereConfig { cfg.Global.VCenterPort = os.Getenv("VSPHERE_VCENTER_PORT") cfg.Global.User = os.Getenv("VSPHERE_USER") cfg.Global.Password = os.Getenv("VSPHERE_PASSWORD") - cfg.Global.Datacenter = os.Getenv("VSPHERE_DATACENTER") - cfg.Global.Datastore = os.Getenv("VSPHERE_DATASTORE") + cfg.Global.Datacenters = os.Getenv("VSPHERE_DATACENTER") + cfg.Global.DefaultDatastore = os.Getenv("VSPHERE_DATASTORE") cfg.Global.WorkingDir = os.Getenv("VSPHERE_WORKING_DIR") cfg.Global.VMName = os.Getenv("VSPHERE_VM_NAME") cfg.Global.InsecureFlag = false if strings.ToLower(os.Getenv("VSPHERE_INSECURE")) == "true" { cfg.Global.InsecureFlag = true } + cfg.Workspace.VCenterIP = cfg.Global.VCenterIP + cfg.Workspace.Datacenter = cfg.Global.Datacenters + cfg.Workspace.DefaultDatastore = cfg.Global.DefaultDatastore + cfg.Workspace.Folder = cfg.Global.WorkingDir return &cfg } @@ -127,49 +151,83 @@ func getvmUUID() (string, error) { return uuid, nil } -// Get all datastores accessible for the virtual machine object. -func getSharedDatastoresInK8SCluster(ctx context.Context, folder *vclib.Folder) ([]*vclib.Datastore, error) { - vmList, err := folder.GetVirtualMachines(ctx) +// Returns the accessible datastores for the given node VM. +func getAccessibleDatastores(ctx context.Context, nodeVmDetail *NodeDetails, nodeManager *NodeManager) ([]*vclib.DatastoreInfo, error) { + accessibleDatastores, err := nodeVmDetail.vm.GetAllAccessibleDatastores(ctx) if err != nil { - glog.Errorf("Failed to get virtual machines in the kubernetes cluster: %s, err: %+v", folder.InventoryPath, err) - return nil, err + // Check if the node VM is not found which indicates that the node info in the node manager is stale. + // If so, rediscover the node and retry. + if vclib.IsManagedObjectNotFoundError(err) { + glog.V(4).Infof("error %q ManagedObjectNotFound for node %q. Rediscovering...", err, nodeVmDetail.NodeName) + err = nodeManager.RediscoverNode(convertToK8sType(nodeVmDetail.NodeName)) + if err == nil { + glog.V(4).Infof("Discovered node %s successfully", nodeVmDetail.NodeName) + nodeInfo, err := nodeManager.GetNodeInfo(convertToK8sType(nodeVmDetail.NodeName)) + if err != nil { + glog.V(4).Infof("error %q getting node info for node %+v", err, nodeVmDetail) + return nil, err + } + + accessibleDatastores, err = nodeInfo.vm.GetAllAccessibleDatastores(ctx) + if err != nil { + glog.V(4).Infof("error %q getting accessible datastores for node %+v", err, nodeVmDetail) + return nil, err + } + } else { + glog.V(4).Infof("error %q rediscovering node %+v", err, nodeVmDetail) + return nil, err + } + } else { + glog.V(4).Infof("error %q getting accessible datastores for node %+v", err, nodeVmDetail) + return nil, err + } } - if vmList == nil || len(vmList) == 0 { - glog.Errorf("No virtual machines found in the kubernetes cluster: %s", folder.InventoryPath) - return nil, fmt.Errorf("No virtual machines found in the kubernetes cluster: %s", folder.InventoryPath) + return accessibleDatastores, nil +} + +// Get all datastores accessible for the virtual machine object. +func getSharedDatastoresInK8SCluster(ctx context.Context, dc *vclib.Datacenter, nodeManager *NodeManager) ([]*vclib.DatastoreInfo, error) { + nodeVmDetails := nodeManager.GetNodeDetails() + if nodeVmDetails == nil || len(nodeVmDetails) == 0 { + msg := fmt.Sprintf("Kubernetes node nodeVmDetail details is empty. nodeVmDetails : %+v", nodeVmDetails) + glog.Error(msg) + return nil, fmt.Errorf(msg) } - index := 0 - var sharedDatastores []*vclib.Datastore - for _, vm := range vmList { - vmName, err := vm.ObjectName(ctx) + var sharedDatastores []*vclib.DatastoreInfo + for index, nodeVmDetail := range nodeVmDetails { + glog.V(9).Infof("Getting accessible datastores for node %s", nodeVmDetail.NodeName) + accessibleDatastores, err := getAccessibleDatastores(ctx, &nodeVmDetail, nodeManager) if err != nil { return nil, err } - if !strings.HasPrefix(vmName, DummyVMPrefixName) { - accessibleDatastores, err := vm.GetAllAccessibleDatastores(ctx) - if err != nil { - return nil, err + if index == 0 { + sharedDatastores = accessibleDatastores + } else { + sharedDatastores = intersect(sharedDatastores, accessibleDatastores) + if len(sharedDatastores) == 0 { + return nil, fmt.Errorf("No shared datastores found in the Kubernetes cluster for nodeVmDetails: %+v", nodeVmDetails) } - if index == 0 { - sharedDatastores = accessibleDatastores - } else { - sharedDatastores = intersect(sharedDatastores, accessibleDatastores) - if len(sharedDatastores) == 0 { - return nil, fmt.Errorf("No shared datastores found in the Kubernetes cluster: %s", folder.InventoryPath) - } - } - index++ } } + glog.V(9).Infof("sharedDatastores : %+v", sharedDatastores) + sharedDatastores, err := getDatastoresForEndpointVC(ctx, dc, sharedDatastores) + if err != nil { + glog.Errorf("Failed to get shared datastores from endpoint VC. err: %+v", err) + return nil, err + } + glog.V(9).Infof("sharedDatastores at endpoint VC: %+v", sharedDatastores) return sharedDatastores, nil } -func intersect(list1 []*vclib.Datastore, list2 []*vclib.Datastore) []*vclib.Datastore { - var sharedDs []*vclib.Datastore +func intersect(list1 []*vclib.DatastoreInfo, list2 []*vclib.DatastoreInfo) []*vclib.DatastoreInfo { + glog.V(9).Infof("list1: %+v", list1) + glog.V(9).Infof("list2: %+v", list2) + var sharedDs []*vclib.DatastoreInfo for _, val1 := range list1 { // Check if val1 is found in list2 for _, val2 := range list2 { - if val1.Reference().Value == val2.Reference().Value { + // Intersection is performed based on the datastoreUrl as this uniquely identifies the datastore. + if val1.Info.Url == val2.Info.Url { sharedDs = append(sharedDs, val1) break } @@ -178,46 +236,42 @@ func intersect(list1 []*vclib.Datastore, list2 []*vclib.Datastore) []*vclib.Data return sharedDs } -// Get the datastores accessible for the virtual machine object. -func getAllAccessibleDatastores(ctx context.Context, client *vim25.Client, vmMo mo.VirtualMachine) ([]string, error) { - host := vmMo.Summary.Runtime.Host - if host == nil { - return nil, errors.New("VM doesn't have a HostSystem") - } - var hostSystemMo mo.HostSystem - s := object.NewSearchIndex(client) - err := s.Properties(ctx, host.Reference(), []string{DatastoreProperty}, &hostSystemMo) - if err != nil { - return nil, err - } - var dsRefValues []string - for _, dsRef := range hostSystemMo.Datastore { - dsRefValues = append(dsRefValues, dsRef.Value) - } - return dsRefValues, nil -} - // getMostFreeDatastore gets the best fit compatible datastore by free space. -func getMostFreeDatastoreName(ctx context.Context, client *vim25.Client, dsObjList []*vclib.Datastore) (string, error) { - dsMoList, err := dsObjList[0].Datacenter.GetDatastoreMoList(ctx, dsObjList, []string{DatastoreInfoProperty}) - if err != nil { - return "", err - } +func getMostFreeDatastoreName(ctx context.Context, client *vim25.Client, dsInfoList []*vclib.DatastoreInfo) (string, error) { var curMax int64 curMax = -1 var index int - for i, dsMo := range dsMoList { - dsFreeSpace := dsMo.Info.GetDatastoreInfo().FreeSpace + for i, dsInfo := range dsInfoList { + dsFreeSpace := dsInfo.Info.GetDatastoreInfo().FreeSpace if dsFreeSpace > curMax { curMax = dsFreeSpace index = i } } - return dsMoList[index].Info.GetDatastoreInfo().Name, nil + return dsInfoList[index].Info.GetDatastoreInfo().Name, nil } -func getPbmCompatibleDatastore(ctx context.Context, client *vim25.Client, storagePolicyName string, folder *vclib.Folder) (string, error) { - pbmClient, err := vclib.NewPbmClient(ctx, client) +// Returns the datastores in the given datacenter by performing lookup based on datastore URL. +func getDatastoresForEndpointVC(ctx context.Context, dc *vclib.Datacenter, sharedDsInfos []*vclib.DatastoreInfo) ([]*vclib.DatastoreInfo, error) { + var datastores []*vclib.DatastoreInfo + allDsInfoMap, err := dc.GetAllDatastores(ctx) + if err != nil { + return nil, err + } + for _, sharedDsInfo := range sharedDsInfos { + dsInfo, ok := allDsInfoMap[sharedDsInfo.Info.Url] + if ok { + datastores = append(datastores, dsInfo) + } else { + glog.V(4).Infof("Warning: Shared datastore with URL %s does not exist in endpoint VC", sharedDsInfo.Info.Url) + } + } + glog.V(9).Infof("Datastore from endpoint VC: %+v", datastores) + return datastores, nil +} + +func getPbmCompatibleDatastore(ctx context.Context, dc *vclib.Datacenter, storagePolicyName string, nodeManager *NodeManager) (string, error) { + pbmClient, err := vclib.NewPbmClient(ctx, dc.Client()) if err != nil { return "", err } @@ -226,35 +280,40 @@ func getPbmCompatibleDatastore(ctx context.Context, client *vim25.Client, storag glog.Errorf("Failed to get Profile ID by name: %s. err: %+v", storagePolicyName, err) return "", err } - sharedDsList, err := getSharedDatastoresInK8SCluster(ctx, folder) + sharedDs, err := getSharedDatastoresInK8SCluster(ctx, dc, nodeManager) if err != nil { - glog.Errorf("Failed to get shared datastores from kubernetes cluster: %s. err: %+v", folder.InventoryPath, err) + glog.Errorf("Failed to get shared datastores. err: %+v", err) return "", err } - compatibleDatastores, _, err := pbmClient.GetCompatibleDatastores(ctx, storagePolicyID, sharedDsList) + if len(sharedDs) == 0 { + msg := "No shared datastores found in the endpoint virtual center" + glog.Errorf(msg) + return "", errors.New(msg) + } + compatibleDatastores, _, err := pbmClient.GetCompatibleDatastores(ctx, dc, storagePolicyID, sharedDs) if err != nil { - glog.Errorf("Failed to get compatible datastores from datastores : %+v with storagePolicy: %s. err: %+v", sharedDsList, storagePolicyID, err) + glog.Errorf("Failed to get compatible datastores from datastores : %+v with storagePolicy: %s. err: %+v", + sharedDs, storagePolicyID, err) return "", err } - datastore, err := getMostFreeDatastoreName(ctx, client, compatibleDatastores) + glog.V(9).Infof("compatibleDatastores : %+v", compatibleDatastores) + datastore, err := getMostFreeDatastoreName(ctx, dc.Client(), compatibleDatastores) if err != nil { glog.Errorf("Failed to get most free datastore from compatible datastores: %+v. err: %+v", compatibleDatastores, err) return "", err } + glog.V(4).Infof("Most free datastore : %+s", datastore) return datastore, err } -func (vs *VSphere) setVMOptions(ctx context.Context, dc *vclib.Datacenter) (*vclib.VMOptions, error) { +func (vs *VSphere) setVMOptions(ctx context.Context, dc *vclib.Datacenter, resourcePoolPath string) (*vclib.VMOptions, error) { var vmOptions vclib.VMOptions - vm, err := dc.GetVMByPath(ctx, vs.cfg.Global.WorkingDir+"/"+vs.localInstanceID) + resourcePool, err := dc.GetResourcePool(ctx, resourcePoolPath) if err != nil { return nil, err } - resourcePool, err := vm.GetResourcePool(ctx) - if err != nil { - return nil, err - } - folder, err := dc.GetFolderByPath(ctx, vs.cfg.Global.WorkingDir) + glog.V(9).Infof("Resource pool path %s, resourcePool %+v", resourcePoolPath, resourcePool) + folder, err := dc.GetFolderByPath(ctx, vs.cfg.Workspace.Folder) if err != nil { return nil, err } @@ -270,28 +329,27 @@ func (vs *VSphere) cleanUpDummyVMs(dummyVMPrefix string) { defer cancel() for { time.Sleep(CleanUpDummyVMRoutineInterval * time.Minute) - // Ensure client is logged in and session is valid - err := vs.conn.Connect(ctx) + vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx) if err != nil { - glog.V(4).Infof("Failed to connect to VC with err: %+v. Retrying again...", err) + glog.V(4).Infof("Failed to get VSphere instance with err: %+v. Retrying again...", err) continue } - dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter) + dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter) if err != nil { - glog.V(4).Infof("Failed to get the datacenter: %s from VC. err: %+v", vs.cfg.Global.Datacenter, err) + glog.V(4).Infof("Failed to get the datacenter: %s from VC. err: %+v", vs.cfg.Workspace.Datacenter, err) continue } // Get the folder reference for global working directory where the dummy VM needs to be created. - vmFolder, err := dc.GetFolderByPath(ctx, vs.cfg.Global.WorkingDir) + vmFolder, err := dc.GetFolderByPath(ctx, vs.cfg.Workspace.Folder) if err != nil { - glog.V(4).Infof("Unable to get the kubernetes folder: %q reference. err: %+v", vs.cfg.Global.WorkingDir, err) + glog.V(4).Infof("Unable to get the kubernetes folder: %q reference. err: %+v", vs.cfg.Workspace.Folder, err) continue } // A write lock is acquired to make sure the cleanUp routine doesn't delete any VM's created by ongoing PVC requests. defer cleanUpDummyVMLock.Lock() err = diskmanagers.CleanUpDummyVMs(ctx, vmFolder, dc) if err != nil { - glog.V(4).Infof("Unable to clean up dummy VM's in the kubernetes cluster: %q. err: %+v", vs.cfg.Global.WorkingDir, err) + glog.V(4).Infof("Unable to clean up dummy VM's in the kubernetes cluster: %q. err: %+v", vs.cfg.Workspace.Folder, err) } } } @@ -353,3 +411,118 @@ func setdatastoreFolderIDMap( } folderNameIDMap[folderName] = folderID } + +func convertVolPathToDevicePath(ctx context.Context, dc *vclib.Datacenter, volPath string) (string, error) { + volPath = vclib.RemoveStorageClusterORFolderNameFromVDiskPath(volPath) + // Get the canonical volume path for volPath. + canonicalVolumePath, err := getcanonicalVolumePath(ctx, dc, volPath) + if err != nil { + glog.Errorf("Failed to get canonical vsphere volume path for volume: %s. err: %+v", volPath, err) + return "", err + } + // Check if the volume path contains .vmdk extension. If not, add the extension and update the nodeVolumes Map + if len(canonicalVolumePath) > 0 && filepath.Ext(canonicalVolumePath) != ".vmdk" { + canonicalVolumePath += ".vmdk" + } + return canonicalVolumePath, nil +} + +// convertVolPathsToDevicePaths removes cluster or folder path from volPaths and convert to canonicalPath +func (vs *VSphere) convertVolPathsToDevicePaths(ctx context.Context, nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName][]string, error) { + vmVolumes := make(map[k8stypes.NodeName][]string) + for nodeName, volPaths := range nodeVolumes { + nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + return nil, err + } + + _, err = vs.getVSphereInstanceForServer(nodeInfo.vcServer, ctx) + if err != nil { + return nil, err + } + + for i, volPath := range volPaths { + deviceVolPath, err := convertVolPathToDevicePath(ctx, nodeInfo.dataCenter, volPath) + if err != nil { + glog.Errorf("Failed to convert vsphere volume path %s to device path for volume %s. err: %+v", volPath, deviceVolPath, err) + return nil, err + } + volPaths[i] = deviceVolPath + } + vmVolumes[nodeName] = volPaths + } + return vmVolumes, nil +} + +// checkDiskAttached verifies volumes are attached to the VMs which are in same vCenter and Datacenter +// Returns nodes if exist any for which VM is not found in that vCenter and Datacenter +func (vs *VSphere) checkDiskAttached(ctx context.Context, nodes []k8stypes.NodeName, nodeVolumes map[k8stypes.NodeName][]string, attached map[string]map[string]bool, retry bool) ([]k8stypes.NodeName, error) { + var nodesToRetry []k8stypes.NodeName + var vmList []*vclib.VirtualMachine + var nodeInfo NodeInfo + var err error + + for _, nodeName := range nodes { + nodeInfo, err = vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + return nodesToRetry, err + } + vmList = append(vmList, nodeInfo.vm) + } + + // Making sure session is valid + _, err = vs.getVSphereInstanceForServer(nodeInfo.vcServer, ctx) + if err != nil { + return nodesToRetry, err + } + + // If any of the nodes are not present property collector query will fail for entire operation + vmMoList, err := nodeInfo.dataCenter.GetVMMoList(ctx, vmList, []string{"config.hardware.device", "name", "config.uuid"}) + if err != nil { + if vclib.IsManagedObjectNotFoundError(err) && !retry { + glog.V(4).Infof("checkDiskAttached: ManagedObjectNotFound for property collector query for nodes: %+v vms: %+v", nodes, vmList) + // Property Collector Query failed + // VerifyVolumePaths per VM + for _, nodeName := range nodes { + nodeInfo, err := vs.nodeManager.GetNodeInfo(nodeName) + if err != nil { + return nodesToRetry, err + } + devices, err := nodeInfo.vm.VirtualMachine.Device(ctx) + if err != nil { + if vclib.IsManagedObjectNotFoundError(err) { + glog.V(4).Infof("checkDiskAttached: ManagedObjectNotFound for Kubernetes node: %s with vSphere Virtual Machine reference: %v", nodeName, nodeInfo.vm) + nodesToRetry = append(nodesToRetry, nodeName) + continue + } + return nodesToRetry, err + } + glog.V(4).Infof("Verifying Volume Paths by devices for node %s and VM %s", nodeName, nodeInfo.vm) + vclib.VerifyVolumePathsForVMDevices(devices, nodeVolumes[nodeName], convertToString(nodeName), attached) + } + } + return nodesToRetry, err + } + + vmMoMap := make(map[string]mo.VirtualMachine) + for _, vmMo := range vmMoList { + if vmMo.Config == nil { + glog.Errorf("Config is not available for VM: %q", vmMo.Name) + continue + } + glog.V(9).Infof("vmMoMap vmname: %q vmuuid: %s", vmMo.Name, strings.ToLower(vmMo.Config.Uuid)) + vmMoMap[strings.ToLower(vmMo.Config.Uuid)] = vmMo + } + + glog.V(9).Infof("vmMoMap: +%v", vmMoMap) + + for _, nodeName := range nodes { + node, err := vs.nodeManager.GetNode(nodeName) + if err != nil { + return nodesToRetry, err + } + glog.V(9).Infof("Verifying volume for nodeName: %q with nodeuuid: %s", nodeName, node.Status.NodeInfo.SystemUUID, vmMoMap) + vclib.VerifyVolumePathsForVM(vmMoMap[strings.ToLower(node.Status.NodeInfo.SystemUUID)], nodeVolumes[nodeName], convertToString(nodeName), attached) + } + return nodesToRetry, nil +} diff --git a/pkg/volume/vsphere_volume/attacher.go b/pkg/volume/vsphere_volume/attacher.go index 5b1879def48..fb9886beba2 100644 --- a/pkg/volume/vsphere_volume/attacher.go +++ b/pkg/volume/vsphere_volume/attacher.go @@ -76,7 +76,7 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No // vsphereCloud.AttachDisk checks if disk is already attached to host and // succeeds in that case, so no need to do that separately. - diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyID, nodeName) + diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyName, nodeName) if err != nil { glog.Errorf("Error attaching volume %q to node %q: %+v", volumeSource.VolumePath, nodeName, err) return "", err diff --git a/test/e2e/storage/persistent_volumes-vsphere.go b/test/e2e/storage/persistent_volumes-vsphere.go index a3be3de784d..1d49361f068 100644 --- a/test/e2e/storage/persistent_volumes-vsphere.go +++ b/test/e2e/storage/persistent_volumes-vsphere.go @@ -70,7 +70,7 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() { selector = metav1.SetAsLabelSelector(volLabel) if vsp == nil { - vsp, err = vsphere.GetVSphere() + vsp, err = getVSphere(c) Expect(err).NotTo(HaveOccurred()) } if volumePath == "" { @@ -105,7 +105,7 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() { node = types.NodeName(clientPod.Spec.NodeName) By("Verify disk should be attached to the node") - isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, node) + isAttached, err := verifyVSphereDiskAttached(c, vsp, volumePath, node) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), "disk is not attached with the node") }) @@ -133,7 +133,11 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() { framework.AddCleanupAction(func() { // Cleanup actions will be called even when the tests are skipped and leaves namespace unset. if len(ns) > 0 && len(volumePath) > 0 { - framework.ExpectNoError(waitForVSphereDiskToDetach(vsp, volumePath, node)) + client, err := framework.LoadClientset() + if err != nil { + return + } + framework.ExpectNoError(waitForVSphereDiskToDetach(client, vsp, volumePath, node)) vsp.DeleteVolume(volumePath) } }) @@ -213,6 +217,6 @@ var _ = SIGDescribe("PersistentVolumes:vsphere", func() { Expect(err).NotTo(HaveOccurred()) By("Verifying Persistent Disk detaches") - waitForVSphereDiskToDetach(vsp, volumePath, node) + waitForVSphereDiskToDetach(c, vsp, volumePath, node) }) }) diff --git a/test/e2e/storage/pv_reclaimpolicy.go b/test/e2e/storage/pv_reclaimpolicy.go index b7415ec91a4..8713ce7d7eb 100644 --- a/test/e2e/storage/pv_reclaimpolicy.go +++ b/test/e2e/storage/pv_reclaimpolicy.go @@ -56,7 +56,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() { }) AfterEach(func() { - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) testCleanupVSpherePersistentVolumeReclaim(vsp, c, ns, volumePath, pv, pvc) }) @@ -74,7 +74,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() { 6. Verify PV is deleted automatically. */ It("should delete persistent volume when reclaimPolicy set to delete and associated claim is deleted", func() { - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) volumePath, pv, pvc, err = testSetupVSpherePersistentVolumeReclaim(vsp, c, ns, v1.PersistentVolumeReclaimDelete) @@ -104,7 +104,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() { 9. Verify PV should be detached from the node and automatically deleted. */ It("should not detach and unmount PV when associated pvc with delete as reclaimPolicy is deleted when it is in use by the pod", func() { - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) volumePath, pv, pvc, err = testSetupVSpherePersistentVolumeReclaim(vsp, c, ns, v1.PersistentVolumeReclaimDelete) @@ -127,19 +127,19 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() { Expect(framework.WaitForPersistentVolumePhase(v1.VolumeFailed, c, pv.Name, 1*time.Second, 60*time.Second)).NotTo(HaveOccurred()) By("Verify the volume is attached to the node") - isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(vsp, pv.Spec.VsphereVolume.VolumePath, node) + isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(c, vsp, pv.Spec.VsphereVolume.VolumePath, node) Expect(verifyDiskAttachedError).NotTo(HaveOccurred()) Expect(isVolumeAttached).To(BeTrue()) By("Verify the volume is accessible and available in the pod") - verifyVSphereVolumesAccessible(pod, []*v1.PersistentVolume{pv}, vsp) + verifyVSphereVolumesAccessible(c, pod, []*v1.PersistentVolume{pv}, vsp) framework.Logf("Verified that Volume is accessible in the POD after deleting PV claim") By("Deleting the Pod") framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod), "Failed to delete pod ", pod.Name) By("Verify PV is detached from the node after Pod is deleted") - Expect(waitForVSphereDiskToDetach(vsp, pv.Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName))).NotTo(HaveOccurred()) + Expect(waitForVSphereDiskToDetach(c, vsp, pv.Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName))).NotTo(HaveOccurred()) By("Verify PV should be deleted automatically") framework.ExpectNoError(framework.WaitForPersistentVolumeDeleted(c, pv.Name, 1*time.Second, 30*time.Second)) @@ -167,7 +167,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:ReclaimPolicy]", func() { It("should retain persistent volume when reclaimPolicy set to retain when associated claim is deleted", func() { var volumeFileContent = "hello from vsphere cloud provider, Random Content is :" + strconv.FormatInt(time.Now().UnixNano(), 10) - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) volumePath, pv, pvc, err = testSetupVSpherePersistentVolumeReclaim(vsp, c, ns, v1.PersistentVolumeReclaimRetain) diff --git a/test/e2e/storage/pvc_label_selector.go b/test/e2e/storage/pvc_label_selector.go index 67842c7717e..d389386a6c7 100644 --- a/test/e2e/storage/pvc_label_selector.go +++ b/test/e2e/storage/pvc_label_selector.go @@ -23,7 +23,6 @@ import ( . "github.com/onsi/gomega" "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" - vsphere "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/test/e2e/framework" ) @@ -104,7 +103,7 @@ var _ = SIGDescribe("PersistentVolumes [Feature:LabelSelector]", func() { func testSetupVSpherePVClabelselector(c clientset.Interface, ns string, ssdlabels map[string]string, vvollabels map[string]string) (volumePath string, pv_ssd *v1.PersistentVolume, pvc_ssd *v1.PersistentVolumeClaim, pvc_vvol *v1.PersistentVolumeClaim, err error) { volumePath = "" By("creating vmdk") - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) volumePath, err = createVSphereVolume(vsp, nil) if err != nil { @@ -134,7 +133,7 @@ func testSetupVSpherePVClabelselector(c clientset.Interface, ns string, ssdlabel func testCleanupVSpherePVClabelselector(c clientset.Interface, ns string, volumePath string, pv_ssd *v1.PersistentVolume, pvc_ssd *v1.PersistentVolumeClaim, pvc_vvol *v1.PersistentVolumeClaim) { By("running testCleanupVSpherePVClabelselector") if len(volumePath) > 0 { - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) vsp.DeleteVolume(volumePath) } diff --git a/test/e2e/storage/volumes.go b/test/e2e/storage/volumes.go index 61c42297ff6..da4ad0bc67c 100644 --- a/test/e2e/storage/volumes.go +++ b/test/e2e/storage/volumes.go @@ -53,7 +53,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/test/e2e/framework" ) @@ -507,7 +506,11 @@ var _ = SIGDescribe("Volumes", func() { Prefix: "vsphere", } By("creating a test vsphere volume") - vsp, err := vsphere.GetVSphere() + c, err := framework.LoadClientset() + if err != nil { + return + } + vsp, err := getVSphere(c) Expect(err).NotTo(HaveOccurred()) volumePath, err = createVSphereVolume(vsp, nil) diff --git a/test/e2e/storage/vsphere_scale.go b/test/e2e/storage/vsphere_scale.go index 810b70d6e33..ded690e2047 100644 --- a/test/e2e/storage/vsphere_scale.go +++ b/test/e2e/storage/vsphere_scale.go @@ -150,7 +150,7 @@ var _ = SIGDescribe("vcp at scale [Feature:vsphere] ", func() { scArrays[index] = sc } - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) volumeCountPerInstance := volumeCount / numberOfInstances @@ -176,7 +176,7 @@ var _ = SIGDescribe("vcp at scale [Feature:vsphere] ", func() { Expect(err).NotTo(HaveOccurred()) } By("Waiting for volumes to be detached from the node") - err = waitForVSphereDisksToDetach(vsp, nodeVolumeMap) + err = waitForVSphereDisksToDetach(client, vsp, nodeVolumeMap) Expect(err).NotTo(HaveOccurred()) for _, pvcClaim := range pvcClaimList { @@ -228,7 +228,7 @@ func VolumeCreateAndAttach(client clientset.Interface, namespace string, sc []*s nodeVolumeMap[pod.Spec.NodeName] = append(nodeVolumeMap[pod.Spec.NodeName], pv.Spec.VsphereVolume.VolumePath) } By("Verify the volume is accessible and available in the pod") - verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp) + verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp) nodeSelectorIndex++ } nodeVolumeMapChan <- nodeVolumeMap diff --git a/test/e2e/storage/vsphere_statefulsets.go b/test/e2e/storage/vsphere_statefulsets.go index c2823c72d85..b0a633ad391 100644 --- a/test/e2e/storage/vsphere_statefulsets.go +++ b/test/e2e/storage/vsphere_statefulsets.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/test/e2e/framework" ) @@ -104,7 +103,7 @@ var _ = SIGDescribe("vsphere statefulset", func() { Expect(scaledownErr).NotTo(HaveOccurred()) statefulsetTester.WaitForStatusReadyReplicas(statefulset, replicas-1) - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) // After scale down, verify vsphere volumes are detached from deleted pods @@ -117,7 +116,7 @@ var _ = SIGDescribe("vsphere statefulset", func() { if volumespec.PersistentVolumeClaim != nil { vSpherediskPath := getvSphereVolumePathFromClaim(client, statefulset.Namespace, volumespec.PersistentVolumeClaim.ClaimName) framework.Logf("Waiting for Volume: %q to detach from Node: %q", vSpherediskPath, sspod.Spec.NodeName) - Expect(waitForVSphereDiskToDetach(vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName))).NotTo(HaveOccurred()) + Expect(waitForVSphereDiskToDetach(client, vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName))).NotTo(HaveOccurred()) } } } @@ -146,7 +145,7 @@ var _ = SIGDescribe("vsphere statefulset", func() { framework.Logf("Verify Volume: %q is attached to the Node: %q", vSpherediskPath, sspod.Spec.NodeName) // Verify scale up has re-attached the same volumes and not introduced new volume Expect(volumesBeforeScaleDown[vSpherediskPath] == "").To(BeFalse()) - isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName)) + isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(client, vsp, vSpherediskPath, types.NodeName(sspod.Spec.NodeName)) Expect(isVolumeAttached).To(BeTrue()) Expect(verifyDiskAttachedError).NotTo(HaveOccurred()) } diff --git a/test/e2e/storage/vsphere_stress.go b/test/e2e/storage/vsphere_stress.go index 4dcea07605d..4be0205e051 100644 --- a/test/e2e/storage/vsphere_stress.go +++ b/test/e2e/storage/vsphere_stress.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/types" k8stype "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/test/e2e/framework" ) @@ -135,9 +134,8 @@ var _ = SIGDescribe("vsphere cloud provider stress [Feature:vsphere]", func() { func PerformVolumeLifeCycleInParallel(f *framework.Framework, client clientset.Interface, namespace string, instanceId string, sc *storageV1.StorageClass, iterations int, wg *sync.WaitGroup) { defer wg.Done() defer GinkgoRecover() - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(f.ClientSet) Expect(err).NotTo(HaveOccurred()) - for iterationCount := 0; iterationCount < iterations; iterationCount++ { logPrefix := fmt.Sprintf("Instance: [%v], Iteration: [%v] :", instanceId, iterationCount+1) By(fmt.Sprintf("%v Creating PVC using the Storage Class: %v", logPrefix, sc.Name)) @@ -164,19 +162,19 @@ func PerformVolumeLifeCycleInParallel(f *framework.Framework, client clientset.I Expect(err).NotTo(HaveOccurred()) By(fmt.Sprintf("%v Verifing the volume: %v is attached to the node VM: %v", logPrefix, persistentvolumes[0].Spec.VsphereVolume.VolumePath, pod.Spec.NodeName)) - isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName)) + isVolumeAttached, verifyDiskAttachedError := verifyVSphereDiskAttached(client, vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, types.NodeName(pod.Spec.NodeName)) Expect(isVolumeAttached).To(BeTrue()) Expect(verifyDiskAttachedError).NotTo(HaveOccurred()) By(fmt.Sprintf("%v Verifing the volume: %v is accessible in the pod: %v", logPrefix, persistentvolumes[0].Spec.VsphereVolume.VolumePath, pod.Name)) - verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp) + verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp) By(fmt.Sprintf("%v Deleting pod: %v", logPrefix, pod.Name)) err = framework.DeletePodWithWait(f, client, pod) Expect(err).NotTo(HaveOccurred()) By(fmt.Sprintf("%v Waiting for volume: %v to be detached from the node: %v", logPrefix, persistentvolumes[0].Spec.VsphereVolume.VolumePath, pod.Spec.NodeName)) - err = waitForVSphereDiskToDetach(vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName)) + err = waitForVSphereDiskToDetach(client, vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName)) Expect(err).NotTo(HaveOccurred()) By(fmt.Sprintf("%v Deleting the Claim: %v", logPrefix, pvclaim.Name)) diff --git a/test/e2e/storage/vsphere_utils.go b/test/e2e/storage/vsphere_utils.go index 2215b6fa77d..b0c87c0cd71 100644 --- a/test/e2e/storage/vsphere_utils.go +++ b/test/e2e/storage/vsphere_utils.go @@ -55,13 +55,13 @@ const ( ) // Sanity check for vSphere testing. Verify the persistent disk attached to the node. -func verifyVSphereDiskAttached(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) (bool, error) { +func verifyVSphereDiskAttached(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) (bool, error) { var ( isAttached bool err error ) if vsp == nil { - vsp, err = vsphere.GetVSphere() + vsp, err = getVSphere(c) Expect(err).NotTo(HaveOccurred()) } isAttached, err = vsp.DiskIsAttached(volumePath, nodeName) @@ -70,7 +70,7 @@ func verifyVSphereDiskAttached(vsp *vsphere.VSphere, volumePath string, nodeName } // Wait until vsphere volumes are detached from the list of nodes or time out after 5 minutes -func waitForVSphereDisksToDetach(vsp *vsphere.VSphere, nodeVolumes map[k8stype.NodeName][]string) error { +func waitForVSphereDisksToDetach(c clientset.Interface, vsp *vsphere.VSphere, nodeVolumes map[k8stype.NodeName][]string) error { var ( err error disksAttached = true @@ -78,7 +78,7 @@ func waitForVSphereDisksToDetach(vsp *vsphere.VSphere, nodeVolumes map[k8stype.N detachPollTime = 10 * time.Second ) if vsp == nil { - vsp, err = vsphere.GetVSphere() + vsp, err = getVSphere(c) if err != nil { return err } @@ -110,7 +110,7 @@ func waitForVSphereDisksToDetach(vsp *vsphere.VSphere, nodeVolumes map[k8stype.N } // Wait until vsphere vmdk moves to expected state on the given node, or time out after 6 minutes -func waitForVSphereDiskStatus(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName, expectedState volumeState) error { +func waitForVSphereDiskStatus(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName, expectedState volumeState) error { var ( err error diskAttached bool @@ -130,7 +130,7 @@ func waitForVSphereDiskStatus(vsp *vsphere.VSphere, volumePath string, nodeName } err = wait.Poll(pollTime, timeout, func() (bool, error) { - diskAttached, err = verifyVSphereDiskAttached(vsp, volumePath, nodeName) + diskAttached, err = verifyVSphereDiskAttached(c, vsp, volumePath, nodeName) if err != nil { return true, err } @@ -154,13 +154,13 @@ func waitForVSphereDiskStatus(vsp *vsphere.VSphere, volumePath string, nodeName } // Wait until vsphere vmdk is attached from the given node or time out after 6 minutes -func waitForVSphereDiskToAttach(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error { - return waitForVSphereDiskStatus(vsp, volumePath, nodeName, volumeStateAttached) +func waitForVSphereDiskToAttach(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error { + return waitForVSphereDiskStatus(c, vsp, volumePath, nodeName, volumeStateAttached) } // Wait until vsphere vmdk is detached from the given node or time out after 6 minutes -func waitForVSphereDiskToDetach(vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error { - return waitForVSphereDiskStatus(vsp, volumePath, nodeName, volumeStateDetached) +func waitForVSphereDiskToDetach(c clientset.Interface, vsp *vsphere.VSphere, volumePath string, nodeName types.NodeName) error { + return waitForVSphereDiskStatus(c, vsp, volumePath, nodeName, volumeStateDetached) } // function to create vsphere volume spec with given VMDK volume path, Reclaim Policy and labels @@ -414,12 +414,12 @@ func createEmptyFilesOnVSphereVolume(namespace string, podName string, filePaths } // verify volumes are attached to the node and are accessible in pod -func verifyVSphereVolumesAccessible(pod *v1.Pod, persistentvolumes []*v1.PersistentVolume, vsp *vsphere.VSphere) { +func verifyVSphereVolumesAccessible(c clientset.Interface, pod *v1.Pod, persistentvolumes []*v1.PersistentVolume, vsp *vsphere.VSphere) { nodeName := pod.Spec.NodeName namespace := pod.Namespace for index, pv := range persistentvolumes { // Verify disks are attached to the node - isAttached, err := verifyVSphereDiskAttached(vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName)) + isAttached, err := verifyVSphereDiskAttached(c, vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName)) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk %v is not attached with the node", pv.Spec.VsphereVolume.VolumePath)) // Verify Volumes are accessible @@ -437,3 +437,23 @@ func getvSphereVolumePathFromClaim(client clientset.Interface, namespace string, Expect(err).NotTo(HaveOccurred()) return pv.Spec.VsphereVolume.VolumePath } + +func addNodesToVCP(vsp *vsphere.VSphere, c clientset.Interface) error { + nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return err + } + for _, node := range nodes.Items { + vsp.NodeAdded(&node) + } + return nil +} + +func getVSphere(c clientset.Interface) (*vsphere.VSphere, error) { + vsp, err := vsphere.GetVSphere() + if err != nil { + return nil, err + } + addNodesToVCP(vsp, c) + return vsp, nil +} diff --git a/test/e2e/storage/vsphere_volume_cluster_ds.go b/test/e2e/storage/vsphere_volume_cluster_ds.go index 75ef426f650..b2ebd29fb16 100644 --- a/test/e2e/storage/vsphere_volume_cluster_ds.go +++ b/test/e2e/storage/vsphere_volume_cluster_ds.go @@ -25,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib" "k8s.io/kubernetes/test/e2e/framework" ) @@ -69,7 +68,7 @@ var _ = SIGDescribe("Volume Provisioning On Clustered Datastore [Feature:vsphere It("verify static provisioning on clustered datastore", func() { var volumePath string - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) By("creating a test vsphere volume") @@ -100,7 +99,7 @@ var _ = SIGDescribe("Volume Provisioning On Clustered Datastore [Feature:vsphere nodeName := types.NodeName(pod.Spec.NodeName) By("Verifying volume is attached") - isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, nodeName) + isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, nodeName) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk: %s is not attached with the node: %v", volumePath, nodeName)) @@ -109,7 +108,7 @@ var _ = SIGDescribe("Volume Provisioning On Clustered Datastore [Feature:vsphere Expect(err).NotTo(HaveOccurred()) By("Waiting for volumes to be detached from the node") - err = waitForVSphereDiskToDetach(vsp, volumePath, nodeName) + err = waitForVSphereDiskToDetach(client, vsp, volumePath, nodeName) Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/storage/vsphere_volume_datastore.go b/test/e2e/storage/vsphere_volume_datastore.go index a80f22e9119..df86a96e5ef 100644 --- a/test/e2e/storage/vsphere_volume_datastore.go +++ b/test/e2e/storage/vsphere_volume_datastore.go @@ -68,7 +68,7 @@ var _ = SIGDescribe("Volume Provisioning on Datastore [Feature:vsphere]", func() scParameters[DiskFormat] = ThinDisk err := invokeInvalidDatastoreTestNeg(client, namespace, scParameters) Expect(err).To(HaveOccurred()) - errorMsg := `Failed to provision volume with StorageClass \"` + DatastoreSCName + `\": datastore '` + InvalidDatastore + `' not found` + errorMsg := `Failed to provision volume with StorageClass \"` + DatastoreSCName + `\": The specified datastore ` + InvalidDatastore + ` is not a shared datastore across node VMs` if !strings.Contains(err.Error(), errorMsg) { Expect(err).NotTo(HaveOccurred(), errorMsg) } diff --git a/test/e2e/storage/vsphere_volume_diskformat.go b/test/e2e/storage/vsphere_volume_diskformat.go index de1915b9905..b805eb4d5a1 100644 --- a/test/e2e/storage/vsphere_volume_diskformat.go +++ b/test/e2e/storage/vsphere_volume_diskformat.go @@ -145,9 +145,9 @@ func invokeTest(f *framework.Framework, client clientset.Interface, namespace st pod, err := client.CoreV1().Pods(namespace).Create(podSpec) Expect(err).NotTo(HaveOccurred()) - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) - verifyVSphereDiskAttached(vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName)) + verifyVSphereDiskAttached(client, vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(nodeName)) By("Waiting for pod to be running") Expect(framework.WaitForPodNameRunningInNamespace(client, pod.Name, namespace)).To(Succeed()) diff --git a/test/e2e/storage/vsphere_volume_fstype.go b/test/e2e/storage/vsphere_volume_fstype.go index 7ace7eed6e4..352b6dd3935 100644 --- a/test/e2e/storage/vsphere_volume_fstype.go +++ b/test/e2e/storage/vsphere_volume_fstype.go @@ -97,7 +97,7 @@ func invokeTestForFstype(f *framework.Framework, client clientset.Interface, nam framework.Logf("Invoking Test for fstype: %s", fstype) scParameters := make(map[string]string) scParameters["fstype"] = fstype - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) // Create Persistent Volume @@ -117,7 +117,7 @@ func invokeTestForFstype(f *framework.Framework, client clientset.Interface, nam func invokeTestForInvalidFstype(f *framework.Framework, client clientset.Interface, namespace string, fstype string) { scParameters := make(map[string]string) scParameters["fstype"] = fstype - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) // Create Persistent Volume @@ -170,12 +170,12 @@ func createPodAndVerifyVolumeAccessible(client clientset.Interface, namespace st pvclaims = append(pvclaims, pvclaim) By("Creating pod to attach PV to the node") // Create pod to attach Volume to Node - pod, err := framework.CreatePod(client, namespace, nil, pvclaims, false, "") + pod, err := framework.CreatePod(client, namespace, nil, pvclaims, false, ExecCommand) Expect(err).NotTo(HaveOccurred()) // Asserts: Right disk is attached to the pod By("Verify the volume is accessible and available in the pod") - verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp) + verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp) return pod } @@ -184,7 +184,7 @@ func detachVolume(f *framework.Framework, client clientset.Interface, vsp *vsphe framework.DeletePodWithWait(f, client, pod) By("Waiting for volumes to be detached from the node") - waitForVSphereDiskToDetach(vsp, volPath, k8stype.NodeName(pod.Spec.NodeName)) + waitForVSphereDiskToDetach(client, vsp, volPath, k8stype.NodeName(pod.Spec.NodeName)) } func deleteVolume(client clientset.Interface, pvclaimName string, namespace string) { diff --git a/test/e2e/storage/vsphere_volume_master_restart.go b/test/e2e/storage/vsphere_volume_master_restart.go index 36a0164aabe..5ba86162517 100644 --- a/test/e2e/storage/vsphere_volume_master_restart.go +++ b/test/e2e/storage/vsphere_volume_master_restart.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/test/e2e/framework" ) @@ -79,7 +78,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]" }) It("verify volume remains attached after master kubelet restart", func() { - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) // Create pod on each node @@ -106,7 +105,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]" nodeName := types.NodeName(pod.Spec.NodeName) By(fmt.Sprintf("Verify volume %s is attached to the pod %v", volumePath, nodeName)) - isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, types.NodeName(nodeName)) + isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, types.NodeName(nodeName)) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk: %s is not attached with the node", volumePath)) @@ -126,7 +125,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]" nodeName := types.NodeName(pod.Spec.NodeName) By(fmt.Sprintf("After master restart, verify volume %v is attached to the pod %v", volumePath, nodeName)) - isAttached, err := verifyVSphereDiskAttached(vsp, volumePaths[i], types.NodeName(nodeName)) + isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePaths[i], types.NodeName(nodeName)) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), fmt.Sprintf("disk: %s is not attached with the node", volumePath)) @@ -135,7 +134,7 @@ var _ = SIGDescribe("Volume Attach Verify [Feature:vsphere][Serial][Disruptive]" Expect(err).NotTo(HaveOccurred()) By(fmt.Sprintf("Waiting for volume %s to be detached from the node %v", volumePath, nodeName)) - err = waitForVSphereDiskToDetach(vsp, volumePath, types.NodeName(nodeName)) + err = waitForVSphereDiskToDetach(client, vsp, volumePath, types.NodeName(nodeName)) Expect(err).NotTo(HaveOccurred()) By(fmt.Sprintf("Deleting volume %s", volumePath)) diff --git a/test/e2e/storage/vsphere_volume_node_poweroff.go b/test/e2e/storage/vsphere_volume_node_poweroff.go index a28a06305f0..b902872edce 100644 --- a/test/e2e/storage/vsphere_volume_node_poweroff.go +++ b/test/e2e/storage/vsphere_volume_node_poweroff.go @@ -61,7 +61,7 @@ var _ = SIGDescribe("Node Poweroff [Feature:vsphere] [Slow] [Disruptive]", func( nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet) Expect(nodeList.Items).NotTo(BeEmpty(), "Unable to find ready and schedulable Node") Expect(len(nodeList.Items) > 1).To(BeTrue(), "At least 2 nodes are required for this test") - vsp, err = vsphere.GetVSphere() + vsp, err = getVSphere(client) Expect(err).NotTo(HaveOccurred()) workingDir = os.Getenv("VSPHERE_WORKING_DIR") Expect(workingDir).NotTo(BeEmpty()) @@ -112,7 +112,7 @@ var _ = SIGDescribe("Node Poweroff [Feature:vsphere] [Slow] [Disruptive]", func( node1 := types.NodeName(pod.Spec.NodeName) By(fmt.Sprintf("Verify disk is attached to the node: %v", node1)) - isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, node1) + isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, node1) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), "Disk is not attached to the node") @@ -139,11 +139,11 @@ var _ = SIGDescribe("Node Poweroff [Feature:vsphere] [Slow] [Disruptive]", func( Expect(err).NotTo(HaveOccurred(), "Pod did not fail over to a different node") By(fmt.Sprintf("Waiting for disk to be attached to the new node: %v", node2)) - err = waitForVSphereDiskToAttach(vsp, volumePath, node2) + err = waitForVSphereDiskToAttach(client, vsp, volumePath, node2) Expect(err).NotTo(HaveOccurred(), "Disk is not attached to the node") By(fmt.Sprintf("Waiting for disk to be detached from the previous node: %v", node1)) - err = waitForVSphereDiskToDetach(vsp, volumePath, node1) + err = waitForVSphereDiskToDetach(client, vsp, volumePath, node1) Expect(err).NotTo(HaveOccurred(), "Disk is not detached from the node") By(fmt.Sprintf("Power on the previous node: %v", node1)) diff --git a/test/e2e/storage/vsphere_volume_ops_storm.go b/test/e2e/storage/vsphere_volume_ops_storm.go index cfaca95fd84..b1b6516d0d9 100644 --- a/test/e2e/storage/vsphere_volume_ops_storm.go +++ b/test/e2e/storage/vsphere_volume_ops_storm.go @@ -75,7 +75,7 @@ var _ = SIGDescribe("Volume Operations Storm [Feature:vsphere]", func() { volume_ops_scale = DEFAULT_VOLUME_OPS_SCALE } pvclaims = make([]*v1.PersistentVolumeClaim, volume_ops_scale) - vsp, err = vsphere.GetVSphere() + vsp, err = getVSphere(client) Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -113,14 +113,14 @@ var _ = SIGDescribe("Volume Operations Storm [Feature:vsphere]", func() { Expect(err).NotTo(HaveOccurred()) By("Verify all volumes are accessible and available in the pod") - verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp) + verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp) By("Deleting pod") framework.ExpectNoError(framework.DeletePodWithWait(f, client, pod)) By("Waiting for volumes to be detached from the node") for _, pv := range persistentvolumes { - waitForVSphereDiskToDetach(vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName)) + waitForVSphereDiskToDetach(client, vsp, pv.Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName)) } }) }) diff --git a/test/e2e/storage/vsphere_volume_perf.go b/test/e2e/storage/vsphere_volume_perf.go index 06bcaa83c6b..59ca3951782 100644 --- a/test/e2e/storage/vsphere_volume_perf.go +++ b/test/e2e/storage/vsphere_volume_perf.go @@ -28,7 +28,6 @@ import ( storageV1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/test/e2e/framework" ) @@ -214,11 +213,11 @@ func invokeVolumeLifeCyclePerformance(f *framework.Framework, client clientset.I latency[AttachOp] = elapsed.Seconds() // Verify access to the volumes - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) for i, pod := range totalpods { - verifyVSphereVolumesAccessible(pod, totalpvs[i], vsp) + verifyVSphereVolumesAccessible(client, pod, totalpvs[i], vsp) } By("Deleting pods") @@ -237,7 +236,7 @@ func invokeVolumeLifeCyclePerformance(f *framework.Framework, client clientset.I } } - err = waitForVSphereDisksToDetach(vsp, nodeVolumeMap) + err = waitForVSphereDisksToDetach(client, vsp, nodeVolumeMap) Expect(err).NotTo(HaveOccurred()) By("Deleting the PVCs") diff --git a/test/e2e/storage/vsphere_volume_placement.go b/test/e2e/storage/vsphere_volume_placement.go index 6b056a994b6..417d0723b15 100644 --- a/test/e2e/storage/vsphere_volume_placement.go +++ b/test/e2e/storage/vsphere_volume_placement.go @@ -57,7 +57,7 @@ var _ = SIGDescribe("Volume Placement", func() { isNodeLabeled = true } By("creating vmdk") - vsp, err = vsphere.GetVSphere() + vsp, err = getVSphere(c) Expect(err).NotTo(HaveOccurred()) volumePath, err := createVSphereVolume(vsp, nil) Expect(err).NotTo(HaveOccurred()) @@ -285,7 +285,7 @@ var _ = SIGDescribe("Volume Placement", func() { framework.ExpectNoError(framework.DeletePodWithWait(f, c, podB), "defer: Failed to delete pod ", podB.Name) By(fmt.Sprintf("wait for volumes to be detached from the node: %v", node1Name)) for _, volumePath := range volumePaths { - framework.ExpectNoError(waitForVSphereDiskToDetach(vsp, volumePath, types.NodeName(node1Name))) + framework.ExpectNoError(waitForVSphereDiskToDetach(c, vsp, volumePath, types.NodeName(node1Name))) } }() @@ -362,7 +362,7 @@ func createPodWithVolumeAndNodeSelector(client clientset.Interface, namespace st By(fmt.Sprintf("Verify volume is attached to the node:%v", nodeName)) for _, volumePath := range volumePaths { - isAttached, err := verifyVSphereDiskAttached(vsp, volumePath, types.NodeName(nodeName)) + isAttached, err := verifyVSphereDiskAttached(client, vsp, volumePath, types.NodeName(nodeName)) Expect(err).NotTo(HaveOccurred()) Expect(isAttached).To(BeTrue(), "disk:"+volumePath+" is not attached with the node") } @@ -385,6 +385,6 @@ func deletePodAndWaitForVolumeToDetach(f *framework.Framework, c clientset.Inter By("Waiting for volume to be detached from the node") for _, volumePath := range volumePaths { - framework.ExpectNoError(waitForVSphereDiskToDetach(vsp, volumePath, types.NodeName(nodeName))) + framework.ExpectNoError(waitForVSphereDiskToDetach(c, vsp, volumePath, types.NodeName(nodeName))) } } diff --git a/test/e2e/storage/vsphere_volume_vsan_policy.go b/test/e2e/storage/vsphere_volume_vsan_policy.go index f558e49c107..f3bef3f1c93 100644 --- a/test/e2e/storage/vsphere_volume_vsan_policy.go +++ b/test/e2e/storage/vsphere_volume_vsan_policy.go @@ -295,16 +295,16 @@ func invokeValidPolicyTest(f *framework.Framework, client clientset.Interface, n pod, err := framework.CreatePod(client, namespace, nil, pvclaims, false, "") Expect(err).NotTo(HaveOccurred()) - vsp, err := vsphere.GetVSphere() + vsp, err := getVSphere(client) Expect(err).NotTo(HaveOccurred()) By("Verify the volume is accessible and available in the pod") - verifyVSphereVolumesAccessible(pod, persistentvolumes, vsp) + verifyVSphereVolumesAccessible(client, pod, persistentvolumes, vsp) By("Deleting pod") framework.DeletePodWithWait(f, client, pod) By("Waiting for volumes to be detached from the node") - waitForVSphereDiskToDetach(vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName)) + waitForVSphereDiskToDetach(client, vsp, persistentvolumes[0].Spec.VsphereVolume.VolumePath, k8stype.NodeName(pod.Spec.NodeName)) } func invokeInvalidPolicyTestNeg(client clientset.Interface, namespace string, scParameters map[string]string) error {