Merge pull request #52131 from vmware/BulkVerifyVolumesImplVsphere

Automatic merge from submit-queue (batch tested with PRs 50068, 52406, 52394, 48551, 52131). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>..

Implement bulk polling of volumes for vSphere

This PR implements bulk polling of volumes - BulkVerifyVolumes() API for vSphere. 

With the existing implementation, vSphere makes multiple calls to VC to check if the volume is attached to a node. If there are "N" volumes attached on "M" nodes, vSphere makes "N" VCenter calls to check if the volumes are attached to VC for all "N" volumes. Also, by default Kubernetes queries if the volumes are attached to nodes every 1 minute. This will substantially increase the number of calls made by vSphere cloud provider to vCenter.

Inorder to prevent this, vSphere cloud provider implements the BulkVerifyVolumes() API in which only a single call is made to vCenter to check if all the volumes are attached to the respective nodes. Irrespective of the number of volumes attached to nodes, the number of vCenter calls will always be 1 on a query to BulkVerifyVolumes() API by kubernetes.

@rohitjogvmw @divyenpatel @luomiao

```release-note
BulkVerifyVolumes() implementation for vSphere
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-23 20:55:54 -07:00 committed by GitHub
commit 63cc42a861
11 changed files with 351 additions and 122 deletions

View File

@ -19,6 +19,7 @@ package vclib
import ( import (
"errors" "errors"
"fmt" "fmt"
"path/filepath"
"strings" "strings"
"github.com/golang/glog" "github.com/golang/glog"
@ -142,6 +143,23 @@ func (dc *Datacenter) GetVMMoList(ctx context.Context, vmObjList []*VirtualMachi
return vmMoList, nil return vmMoList, nil
} }
// GetVirtualDiskPage83Data gets the virtual disk UUID by diskPath
func (dc *Datacenter) GetVirtualDiskPage83Data(ctx context.Context, diskPath string) (string, error) {
if len(diskPath) > 0 && filepath.Ext(diskPath) != ".vmdk" {
diskPath += ".vmdk"
}
vdm := object.NewVirtualDiskManager(dc.Client())
// Returns uuid of vmdk virtual disk
diskUUID, err := vdm.QueryVirtualDiskUuid(ctx, diskPath, dc.Datacenter)
if err != nil {
glog.Warningf("QueryVirtualDiskUuid failed for diskPath: %q. err: %+v", diskPath, err)
return "", err
}
diskUUID = formatVirtualDiskUUID(diskUUID)
return diskUUID, nil
}
// GetDatastoreMoList gets the Datastore Managed Objects with the given properties from the datastore objects // GetDatastoreMoList gets the Datastore Managed Objects with the given properties from the datastore objects
func (dc *Datacenter) GetDatastoreMoList(ctx context.Context, dsObjList []*Datastore, properties []string) ([]mo.Datastore, error) { func (dc *Datacenter) GetDatastoreMoList(ctx context.Context, dsObjList []*Datastore, properties []string) ([]mo.Datastore, error) {
var dsMoList []mo.Datastore var dsMoList []mo.Datastore
@ -162,3 +180,78 @@ func (dc *Datacenter) GetDatastoreMoList(ctx context.Context, dsObjList []*Datas
} }
return dsMoList, nil return dsMoList, nil
} }
// CheckDisksAttached checks if the disk is attached to node.
// This is done by comparing the volume path with the backing.FilePath on the VM Virtual disk devices.
func (dc *Datacenter) CheckDisksAttached(ctx context.Context, nodeVolumes map[string][]string) (map[string]map[string]bool, error) {
attached := make(map[string]map[string]bool)
var vmList []*VirtualMachine
for nodeName, volPaths := range nodeVolumes {
for _, volPath := range volPaths {
setNodeVolumeMap(attached, volPath, nodeName, false)
}
vm, err := dc.GetVMByPath(ctx, nodeName)
if err != nil {
if IsNotFound(err) {
glog.Warningf("Node %q does not exist, vSphere CP will assume disks %v are not attached to it.", nodeName, volPaths)
}
continue
}
vmList = append(vmList, vm)
}
if len(vmList) == 0 {
glog.V(2).Infof("vSphere CP will assume no disks are attached to any node.")
return attached, nil
}
vmMoList, err := dc.GetVMMoList(ctx, vmList, []string{"config.hardware.device", "name"})
if err != nil {
// When there is an error fetching instance information
// it is safer to return nil and let volume information not be touched.
glog.Errorf("Failed to get VM Managed object for nodes: %+v. err: +%v", vmList, err)
return nil, err
}
for _, vmMo := range vmMoList {
if vmMo.Config == nil {
glog.Errorf("Config is not available for VM: %q", vmMo.Name)
continue
}
for nodeName, volPaths := range nodeVolumes {
if nodeName == vmMo.Name {
verifyVolumePathsForVM(vmMo, volPaths, attached)
}
}
}
return attached, nil
}
// VerifyVolumePathsForVM verifies if the volume paths (volPaths) are attached to VM.
func verifyVolumePathsForVM(vmMo mo.VirtualMachine, volPaths []string, nodeVolumeMap map[string]map[string]bool) {
// Verify if the volume paths are present on the VM backing virtual disk devices
for _, volPath := range volPaths {
vmDevices := object.VirtualDeviceList(vmMo.Config.Hardware.Device)
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
if backing.FileName == volPath {
setNodeVolumeMap(nodeVolumeMap, volPath, vmMo.Name, true)
}
}
}
}
}
}
func setNodeVolumeMap(
nodeVolumeMap map[string]map[string]bool,
volumePath string,
nodeName string,
check bool) {
volumeMap := nodeVolumeMap[nodeName]
if volumeMap == nil {
volumeMap = make(map[string]bool)
nodeVolumeMap[nodeName] = volumeMap
}
volumeMap[volumePath] = check
}

View File

@ -35,7 +35,7 @@ type virtualDiskManager struct {
// Create implements Disk's Create interface // Create implements Disk's Create interface
// Contains implementation of virtualDiskManager based Provisioning // Contains implementation of virtualDiskManager based Provisioning
func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (err error) { func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (canonicalDiskPath string, err error) {
if diskManager.volumeOptions.SCSIControllerType == "" { if diskManager.volumeOptions.SCSIControllerType == "" {
diskManager.volumeOptions.SCSIControllerType = vclib.LSILogicControllerType diskManager.volumeOptions.SCSIControllerType = vclib.LSILogicControllerType
} }
@ -57,15 +57,16 @@ func (diskManager virtualDiskManager) Create(ctx context.Context, datastore *vcl
if err != nil { if err != nil {
vclib.RecordvSphereMetric(vclib.APICreateVolume, requestTime, err) vclib.RecordvSphereMetric(vclib.APICreateVolume, requestTime, err)
glog.Errorf("Failed to create virtual disk: %s. err: %+v", diskManager.diskPath, err) glog.Errorf("Failed to create virtual disk: %s. err: %+v", diskManager.diskPath, err)
return err return "", err
} }
err = task.Wait(ctx) taskInfo, err := task.WaitForResult(ctx, nil)
vclib.RecordvSphereMetric(vclib.APICreateVolume, requestTime, err) vclib.RecordvSphereMetric(vclib.APICreateVolume, requestTime, err)
if err != nil { if err != nil {
glog.Errorf("Failed to create virtual disk: %s. err: %+v", diskManager.diskPath, err) glog.Errorf("Failed to complete virtual disk creation: %s. err: %+v", diskManager.diskPath, err)
return err return "", err
} }
return nil canonicalDiskPath = taskInfo.Result.(string)
return canonicalDiskPath, nil
} }
// Delete implements Disk's Delete interface // Delete implements Disk's Delete interface

View File

@ -39,7 +39,7 @@ const (
// VirtualDiskProvider defines interfaces for creating disk // VirtualDiskProvider defines interfaces for creating disk
type VirtualDiskProvider interface { type VirtualDiskProvider interface {
Create(ctx context.Context, datastore *vclib.Datastore) error Create(ctx context.Context, datastore *vclib.Datastore) (string, error)
Delete(ctx context.Context, datastore *vclib.Datastore) error Delete(ctx context.Context, datastore *vclib.Datastore) error
} }
@ -60,16 +60,16 @@ func getDiskManager(disk *VirtualDisk, diskOperation string) VirtualDiskProvider
} }
// Create gets appropriate disk manager and calls respective create method // Create gets appropriate disk manager and calls respective create method
func (virtualDisk *VirtualDisk) Create(ctx context.Context, datastore *vclib.Datastore) error { func (virtualDisk *VirtualDisk) Create(ctx context.Context, datastore *vclib.Datastore) (string, error) {
if virtualDisk.VolumeOptions.DiskFormat == "" { if virtualDisk.VolumeOptions.DiskFormat == "" {
virtualDisk.VolumeOptions.DiskFormat = vclib.ThinDiskType virtualDisk.VolumeOptions.DiskFormat = vclib.ThinDiskType
} }
if !virtualDisk.VolumeOptions.VerifyVolumeOptions() { if !virtualDisk.VolumeOptions.VerifyVolumeOptions() {
glog.Error("VolumeOptions verification failed. volumeOptions: ", virtualDisk.VolumeOptions) glog.Error("VolumeOptions verification failed. volumeOptions: ", virtualDisk.VolumeOptions)
return vclib.ErrInvalidVolumeOptions return "", vclib.ErrInvalidVolumeOptions
} }
if virtualDisk.VolumeOptions.StoragePolicyID != "" && virtualDisk.VolumeOptions.StoragePolicyName != "" { if virtualDisk.VolumeOptions.StoragePolicyID != "" && virtualDisk.VolumeOptions.StoragePolicyName != "" {
return fmt.Errorf("Storage Policy ID and Storage Policy Name both set, Please set only one parameter") return "", fmt.Errorf("Storage Policy ID and Storage Policy Name both set, Please set only one parameter")
} }
return getDiskManager(virtualDisk, VirtualDiskCreateOperation).Create(ctx, datastore) return getDiskManager(virtualDisk, VirtualDiskCreateOperation).Create(ctx, datastore)
} }

View File

@ -37,33 +37,33 @@ type vmDiskManager struct {
// Create implements Disk's Create interface // Create implements Disk's Create interface
// Contains implementation of VM based Provisioning to provision disk with SPBM Policy or VSANStorageProfileData // Contains implementation of VM based Provisioning to provision disk with SPBM Policy or VSANStorageProfileData
func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (err error) { func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datastore) (canonicalDiskPath string, err error) {
if vmdisk.volumeOptions.SCSIControllerType == "" { if vmdisk.volumeOptions.SCSIControllerType == "" {
vmdisk.volumeOptions.SCSIControllerType = vclib.PVSCSIControllerType vmdisk.volumeOptions.SCSIControllerType = vclib.PVSCSIControllerType
} }
pbmClient, err := vclib.NewPbmClient(ctx, datastore.Client()) pbmClient, err := vclib.NewPbmClient(ctx, datastore.Client())
if err != nil { if err != nil {
glog.Errorf("Error occurred while creating new pbmClient, err: %+v", err) glog.Errorf("Error occurred while creating new pbmClient, err: %+v", err)
return err return "", err
} }
if vmdisk.volumeOptions.StoragePolicyID == "" && vmdisk.volumeOptions.StoragePolicyName != "" { if vmdisk.volumeOptions.StoragePolicyID == "" && vmdisk.volumeOptions.StoragePolicyName != "" {
vmdisk.volumeOptions.StoragePolicyID, err = pbmClient.ProfileIDByName(ctx, vmdisk.volumeOptions.StoragePolicyName) vmdisk.volumeOptions.StoragePolicyID, err = pbmClient.ProfileIDByName(ctx, vmdisk.volumeOptions.StoragePolicyName)
if err != nil { if err != nil {
glog.Errorf("Error occurred while getting Profile Id from Profile Name: %s, err: %+v", vmdisk.volumeOptions.StoragePolicyName, err) glog.Errorf("Error occurred while getting Profile Id from Profile Name: %s, err: %+v", vmdisk.volumeOptions.StoragePolicyName, err)
return err return "", err
} }
} }
if vmdisk.volumeOptions.StoragePolicyID != "" { if vmdisk.volumeOptions.StoragePolicyID != "" {
compatible, faultMessage, err := datastore.IsCompatibleWithStoragePolicy(ctx, vmdisk.volumeOptions.StoragePolicyID) compatible, faultMessage, err := datastore.IsCompatibleWithStoragePolicy(ctx, vmdisk.volumeOptions.StoragePolicyID)
if err != nil { if err != nil {
glog.Errorf("Error occurred while checking datastore compatibility with storage policy id: %s, err: %+v", vmdisk.volumeOptions.StoragePolicyID, err) glog.Errorf("Error occurred while checking datastore compatibility with storage policy id: %s, err: %+v", vmdisk.volumeOptions.StoragePolicyID, err)
return err return "", err
} }
if !compatible { if !compatible {
glog.Errorf("Datastore: %s is not compatible with Policy: %s", datastore.Name(), vmdisk.volumeOptions.StoragePolicyName) glog.Errorf("Datastore: %s is not compatible with Policy: %s", datastore.Name(), vmdisk.volumeOptions.StoragePolicyName)
return fmt.Errorf("User specified datastore is not compatible with the storagePolicy: %q. Failed with faults: %+q", vmdisk.volumeOptions.StoragePolicyName, faultMessage) return "", fmt.Errorf("User specified datastore is not compatible with the storagePolicy: %q. Failed with faults: %+q", vmdisk.volumeOptions.StoragePolicyName, faultMessage)
} }
} }
@ -76,11 +76,11 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
// Check Datastore type - VSANStorageProfileData is only applicable to vSAN Datastore // Check Datastore type - VSANStorageProfileData is only applicable to vSAN Datastore
dsType, err := datastore.GetType(ctx) dsType, err := datastore.GetType(ctx)
if err != nil { if err != nil {
return err return "", err
} }
if dsType != vclib.VSANDatastoreType { if dsType != vclib.VSANDatastoreType {
glog.Errorf("The specified datastore: %q is not a VSAN datastore", datastore.Name()) glog.Errorf("The specified datastore: %q is not a VSAN datastore", datastore.Name())
return fmt.Errorf("The specified datastore: %q is not a VSAN datastore."+ return "", fmt.Errorf("The specified datastore: %q is not a VSAN datastore."+
" The policy parameters will work only with VSAN Datastore."+ " The policy parameters will work only with VSAN Datastore."+
" So, please specify a valid VSAN datastore in Storage class definition.", datastore.Name()) " So, please specify a valid VSAN datastore in Storage class definition.", datastore.Name())
} }
@ -91,7 +91,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
} }
} else { } else {
glog.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set") glog.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set")
return fmt.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set") return "", fmt.Errorf("Both volumeOptions.StoragePolicyID and volumeOptions.VSANStorageProfileData are not set. One of them should be set")
} }
var dummyVM *vclib.VirtualMachine var dummyVM *vclib.VirtualMachine
// Check if VM already exist in the folder. // Check if VM already exist in the folder.
@ -106,7 +106,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
dummyVM, err = vmdisk.createDummyVM(ctx, datastore.Datacenter, dummyVMFullName) dummyVM, err = vmdisk.createDummyVM(ctx, datastore.Datacenter, dummyVMFullName)
if err != nil { if err != nil {
glog.Errorf("Failed to create Dummy VM. err: %v", err) glog.Errorf("Failed to create Dummy VM. err: %v", err)
return err return "", err
} }
} }
@ -115,7 +115,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
disk, _, err := dummyVM.CreateDiskSpec(ctx, vmdisk.diskPath, datastore, vmdisk.volumeOptions) disk, _, err := dummyVM.CreateDiskSpec(ctx, vmdisk.diskPath, datastore, vmdisk.volumeOptions)
if err != nil { if err != nil {
glog.Errorf("Failed to create Disk Spec. err: %v", err) glog.Errorf("Failed to create Disk Spec. err: %v", err)
return err return "", err
} }
deviceConfigSpec := &types.VirtualDeviceConfigSpec{ deviceConfigSpec := &types.VirtualDeviceConfigSpec{
Device: disk, Device: disk,
@ -135,7 +135,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
glog.V(vclib.LogLevel).Info("File: %v already exists", vmdisk.diskPath) glog.V(vclib.LogLevel).Info("File: %v already exists", vmdisk.diskPath)
} else { } else {
glog.Errorf("Failed to attach the disk to VM: %q with err: %+v", dummyVMFullName, err) glog.Errorf("Failed to attach the disk to VM: %q with err: %+v", dummyVMFullName, err)
return err return "", err
} }
} }
// Detach the disk from the dummy VM. // Detach the disk from the dummy VM.
@ -146,7 +146,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
glog.V(vclib.LogLevel).Info("File: %v is already detached", vmdisk.diskPath) glog.V(vclib.LogLevel).Info("File: %v is already detached", vmdisk.diskPath)
} else { } else {
glog.Errorf("Failed to detach the disk: %q from VM: %q with err: %+v", vmdisk.diskPath, dummyVMFullName, err) glog.Errorf("Failed to detach the disk: %q from VM: %q with err: %+v", vmdisk.diskPath, dummyVMFullName, err)
return err return "", err
} }
} }
// Delete the dummy VM // Delete the dummy VM
@ -154,7 +154,7 @@ func (vmdisk vmDiskManager) Create(ctx context.Context, datastore *vclib.Datasto
if err != nil { if err != nil {
glog.Errorf("Failed to destroy the vm: %q with err: %+v", dummyVMFullName, err) glog.Errorf("Failed to destroy the vm: %q with err: %+v", dummyVMFullName, err)
} }
return nil return vmdisk.diskPath, nil
} }
func (vmdisk vmDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error { func (vmdisk vmDiskManager) Delete(ctx context.Context, datastore *vclib.Datastore) error {

View File

@ -22,6 +22,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"github.com/golang/glog"
"github.com/vmware/govmomi/find" "github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object" "github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/types" "github.com/vmware/govmomi/vim25/types"
@ -130,3 +131,44 @@ func RemoveClusterFromVDiskPath(vDiskPath string) string {
} }
return vDiskPath return vDiskPath
} }
// GetPathFromVMDiskPath retrieves the path from VM Disk Path.
// Example: For vmDiskPath - [vsanDatastore] kubevols/volume.vmdk, the path is kubevols/volume.vmdk
func GetPathFromVMDiskPath(vmDiskPath string) string {
datastorePathObj := new(object.DatastorePath)
isSuccess := datastorePathObj.FromString(vmDiskPath)
if !isSuccess {
glog.Errorf("Failed to parse vmDiskPath: %s", vmDiskPath)
return ""
}
return datastorePathObj.Path
}
// GetDatastoreFromVMDiskPath retrieves the path from VM Disk Path.
// Example: For vmDiskPath - [vsanDatastore] kubevols/volume.vmdk, the path is vsanDatastore
func GetDatastoreFromVMDiskPath(vmDiskPath string) string {
datastorePathObj := new(object.DatastorePath)
isSuccess := datastorePathObj.FromString(vmDiskPath)
if !isSuccess {
glog.Errorf("Failed to parse vmDiskPath: %s", vmDiskPath)
return ""
}
return datastorePathObj.Datastore
}
//GetDatastorePathObjFromVMDiskPath gets the datastorePathObj from VM disk path.
func GetDatastorePathObjFromVMDiskPath(vmDiskPath string) (*object.DatastorePath, error) {
datastorePathObj := new(object.DatastorePath)
isSuccess := datastorePathObj.FromString(vmDiskPath)
if !isSuccess {
glog.Errorf("Failed to parse volPath: %s", vmDiskPath)
return nil, fmt.Errorf("Failed to parse volPath: %s", vmDiskPath)
}
return datastorePathObj, nil
}
//IsValidUUID checks if the string is a valid UUID.
func IsValidUUID(uuid string) bool {
r := regexp.MustCompile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$")
return r.MatchString(uuid)
}

View File

@ -19,7 +19,6 @@ package vclib
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -46,23 +45,6 @@ func (vm *VirtualMachine) IsDiskAttached(ctx context.Context, diskPath string) (
return false, nil return false, nil
} }
// GetVirtualDiskPage83Data gets the virtual disk UUID by diskPath
func (vm *VirtualMachine) GetVirtualDiskPage83Data(ctx context.Context, diskPath string) (string, error) {
if len(diskPath) > 0 && filepath.Ext(diskPath) != ".vmdk" {
diskPath += ".vmdk"
}
vdm := object.NewVirtualDiskManager(vm.Client())
// Returns uuid of vmdk virtual disk
diskUUID, err := vdm.QueryVirtualDiskUuid(ctx, diskPath, vm.Datacenter.Datacenter)
if err != nil {
glog.Errorf("QueryVirtualDiskUuid failed for diskPath: %q on VM: %q. err: %+v", diskPath, vm.InventoryPath, err)
return "", ErrNoDiskUUIDFound
}
diskUUID = formatVirtualDiskUUID(diskUUID)
return diskUUID, nil
}
// DeleteVM deletes the VM. // DeleteVM deletes the VM.
func (vm *VirtualMachine) DeleteVM(ctx context.Context) error { func (vm *VirtualMachine) DeleteVM(ctx context.Context) error {
destroyTask, err := vm.Destroy(ctx) destroyTask, err := vm.Destroy(ctx)
@ -89,7 +71,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
} }
// If disk is already attached, return the disk UUID // If disk is already attached, return the disk UUID
if attached { if attached {
diskUUID, _ := vm.GetVirtualDiskPage83Data(ctx, vmDiskPath) diskUUID, _ := vm.Datacenter.GetVirtualDiskPage83Data(ctx, vmDiskPath)
return diskUUID, nil return diskUUID, nil
} }
@ -143,7 +125,7 @@ func (vm *VirtualMachine) AttachDisk(ctx context.Context, vmDiskPath string, vol
} }
// Once disk is attached, get the disk UUID. // Once disk is attached, get the disk UUID.
diskUUID, err := vm.GetVirtualDiskPage83Data(ctx, vmDiskPath) diskUUID, err := vm.Datacenter.GetVirtualDiskPage83Data(ctx, vmDiskPath)
if err != nil { if err != nil {
glog.Errorf("Error occurred while getting Disk Info from VM: %q. err: %v", vm.InventoryPath, err) glog.Errorf("Error occurred while getting Disk Info from VM: %q. err: %v", vm.InventoryPath, err)
vm.DetachDisk(ctx, vmDiskPath) vm.DetachDisk(ctx, vmDiskPath)
@ -285,6 +267,25 @@ func (vm *VirtualMachine) CreateDiskSpec(ctx context.Context, diskPath string, d
return disk, newSCSIController, nil return disk, newSCSIController, nil
} }
// GetVirtualDiskPath gets the first available virtual disk devicePath from the VM
func (vm *VirtualMachine) GetVirtualDiskPath(ctx context.Context) (string, error) {
vmDevices, err := vm.Device(ctx)
if err != nil {
glog.Errorf("Failed to get the devices for VM: %q. err: %+v", vm.InventoryPath, err)
return "", err
}
// filter vm devices to retrieve device for the given vmdk file identified by disk path
for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
return backing.FileName, nil
}
}
}
return "", nil
}
// createAndAttachSCSIController creates and attachs the SCSI controller to the VM. // createAndAttachSCSIController creates and attachs the SCSI controller to the VM.
func (vm *VirtualMachine) createAndAttachSCSIController(ctx context.Context, diskControllerType string) (types.BaseVirtualDevice, error) { func (vm *VirtualMachine) createAndAttachSCSIController(ctx context.Context, diskControllerType string) (types.BaseVirtualDevice, error) {
// Get VM device list // Get VM device list
@ -322,24 +323,17 @@ func (vm *VirtualMachine) createAndAttachSCSIController(ctx context.Context, dis
// getVirtualDeviceByPath gets the virtual device by path // getVirtualDeviceByPath gets the virtual device by path
func (vm *VirtualMachine) getVirtualDeviceByPath(ctx context.Context, diskPath string) (types.BaseVirtualDevice, error) { func (vm *VirtualMachine) getVirtualDeviceByPath(ctx context.Context, diskPath string) (types.BaseVirtualDevice, error) {
var diskUUID string
vmDevices, err := vm.Device(ctx) vmDevices, err := vm.Device(ctx)
if err != nil { if err != nil {
glog.Errorf("Failed to get the devices for VM: %q. err: %+v", vm.InventoryPath, err) glog.Errorf("Failed to get the devices for VM: %q. err: %+v", vm.InventoryPath, err)
return nil, err return nil, err
} }
volumeUUID, err := vm.GetVirtualDiskPage83Data(ctx, diskPath)
if err != nil {
glog.Errorf("Failed to get disk UUID for path: %q on VM: %q. err: %+v", diskPath, vm.InventoryPath, err)
return nil, err
}
// filter vm devices to retrieve device for the given vmdk file identified by disk path // filter vm devices to retrieve device for the given vmdk file identified by disk path
for _, device := range vmDevices { for _, device := range vmDevices {
if vmDevices.TypeName(device) == "VirtualDisk" { if vmDevices.TypeName(device) == "VirtualDisk" {
virtualDevice := device.GetVirtualDevice() virtualDevice := device.GetVirtualDevice()
if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok { if backing, ok := virtualDevice.Backing.(*types.VirtualDiskFlatVer2BackingInfo); ok {
diskUUID = formatVirtualDiskUUID(backing.Uuid) if backing.FileName == diskPath {
if diskUUID == volumeUUID {
return device, nil return device, nil
} }
} }

View File

@ -56,6 +56,7 @@ const (
) )
var cleanUpRoutineInitialized = false var cleanUpRoutineInitialized = false
var datastoreFolderIDMap = make(map[string]map[string]string)
var clientLock sync.Mutex var clientLock sync.Mutex
var cleanUpRoutineInitLock sync.Mutex var cleanUpRoutineInitLock sync.Mutex
@ -127,7 +128,7 @@ type Volumes interface {
// DisksAreAttached checks if a list disks are attached to the given node. // DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node. // Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(volPath []string, nodeName k8stypes.NodeName) (map[string]bool, error) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters. // CreateVolume creates a new vmdk with specified parameters.
CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error)
@ -570,19 +571,12 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName k8stypes.NodeName) (b
} }
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin. // DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) { func (vs *VSphere) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
disksAreAttachedInternal := func(volPaths []string, nodeName k8stypes.NodeName) (map[string]bool, error) { disksAreAttachedInternal := func(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
attached := make(map[string]bool) attached := make(map[k8stypes.NodeName]map[string]bool)
if len(volPaths) == 0 { if len(nodeVolumes) == 0 {
return attached, nil return attached, nil
} }
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
nodeName = vmNameToNodeName(vSphereInstance)
} else {
vSphereInstance = nodeNameToVMName(nodeName)
}
// Create context // Create context
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -591,41 +585,40 @@ func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeNam
if err != nil { if err != nil {
return nil, err return nil, err
} }
vm, err := vs.getVMByName(ctx, nodeName) dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter)
if err != nil { if err != nil {
if vclib.IsNotFound(err) {
glog.Warningf("Node %q does not exist, vsphere CP will assume all disks %v are not attached to it.", nodeName, volPaths)
// make all the disks as detached and return false without error.
attached := make(map[string]bool)
for _, volPath := range volPaths {
attached[volPath] = false
}
return attached, nil
}
glog.Errorf("Failed to get VM object for node: %q. err: +%v", vSphereInstance, err)
return nil, err return nil, err
} }
for _, volPath := range volPaths { vmVolumes := make(map[string][]string)
result, err := vm.IsDiskAttached(ctx, volPath) for nodeName, volPaths := range nodeVolumes {
if err == nil { for i, volPath := range volPaths {
if result { // Get the canonical volume path for volPath.
attached[volPath] = true canonicalVolumePath, err := getcanonicalVolumePath(ctx, dc, volPath)
} else { if err != nil {
attached[volPath] = false glog.Errorf("Failed to get canonical vsphere volume path for volume: %s. err: %+v", volPath, err)
}
} else {
glog.Errorf("DisksAreAttached failed to determine whether disk %q from volPaths %+v is still attached on node %q",
volPath,
volPaths,
vSphereInstance)
return nil, err return nil, err
} }
// Check if the volume path contains .vmdk extension. If not, add the extension and update the nodeVolumes Map
if len(canonicalVolumePath) > 0 && filepath.Ext(canonicalVolumePath) != ".vmdk" {
canonicalVolumePath += ".vmdk"
}
volPaths[i] = canonicalVolumePath
}
vmVolumes[nodeNameToVMName(nodeName)] = volPaths
}
// Check if the disks are attached to their respective nodes
disksAttachedList, err := dc.CheckDisksAttached(ctx, vmVolumes)
if err != nil {
return nil, err
}
for vmName, volPaths := range disksAttachedList {
attached[vmNameToNodeName(vmName)] = volPaths
} }
return attached, nil return attached, nil
} }
requestTime := time.Now() requestTime := time.Now()
attached, err := disksAreAttachedInternal(volPaths, nodeName) attached, err := disksAreAttachedInternal(nodeVolumes)
vclib.RecordvSphereMetric(vclib.OperationDisksAreAttached, requestTime, err) vclib.RecordvSphereMetric(vclib.OperationDisksAreAttached, requestTime, err)
return attached, err return attached, err
} }
@ -634,9 +627,9 @@ func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName k8stypes.NodeNam
// If the volumeOptions.Datastore is part of datastore cluster for example - [DatastoreCluster/sharedVmfs-0] then // If the volumeOptions.Datastore is part of datastore cluster for example - [DatastoreCluster/sharedVmfs-0] then
// return value will be [DatastoreCluster/sharedVmfs-0] kubevols/<volume-name>.vmdk // return value will be [DatastoreCluster/sharedVmfs-0] kubevols/<volume-name>.vmdk
// else return value will be [sharedVmfs-0] kubevols/<volume-name>.vmdk // else return value will be [sharedVmfs-0] kubevols/<volume-name>.vmdk
func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error) { func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
glog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions) glog.V(1).Infof("Starting to create a vSphere volume with volumeOptions: %+v", volumeOptions)
createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (volumePath string, err error) { createVolumeInternal := func(volumeOptions *vclib.VolumeOptions) (canonicalVolumePath string, err error) {
var datastore string var datastore string
// Default datastore is the datastore in the vSphere config file that is used to initialize vSphere cloud provider. // Default datastore is the datastore in the vSphere config file that is used to initialize vSphere cloud provider.
if volumeOptions.Datastore == "" { if volumeOptions.Datastore == "" {
@ -644,6 +637,7 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath
} else { } else {
datastore = volumeOptions.Datastore datastore = volumeOptions.Datastore
} }
datastore = strings.TrimSpace(datastore)
// Create context // Create context
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -694,27 +688,34 @@ func (vs *VSphere) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath
glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err) glog.Errorf("Cannot create dir %#v. err %s", kubeVolsPath, err)
return "", err return "", err
} }
volumePath = kubeVolsPath + volumeOptions.Name + ".vmdk" volumePath := kubeVolsPath + volumeOptions.Name + ".vmdk"
disk := diskmanagers.VirtualDisk{ disk := diskmanagers.VirtualDisk{
DiskPath: volumePath, DiskPath: volumePath,
VolumeOptions: volumeOptions, VolumeOptions: volumeOptions,
VMOptions: vmOptions, VMOptions: vmOptions,
} }
err = disk.Create(ctx, ds) volumePath, err = disk.Create(ctx, ds)
if err != nil { if err != nil {
glog.Errorf("Failed to create a vsphere volume with volumeOptions: %+v on datastore: %s. err: %+v", volumeOptions, datastore, err) glog.Errorf("Failed to create a vsphere volume with volumeOptions: %+v on datastore: %s. err: %+v", volumeOptions, datastore, err)
return "", err return "", err
} }
// Get the canonical path for the volume path.
canonicalVolumePath, err = getcanonicalVolumePath(ctx, dc, volumePath)
if err != nil {
glog.Errorf("Failed to get canonical vsphere volume path for volume: %s with volumeOptions: %+v on datastore: %s. err: %+v", volumePath, volumeOptions, datastore, err)
return "", err
}
if filepath.Base(datastore) != datastore { if filepath.Base(datastore) != datastore {
// If datastore is within cluster, add cluster path to the volumePath // If datastore is within cluster, add cluster path to the volumePath
volumePath = strings.Replace(volumePath, filepath.Base(datastore), datastore, 1) canonicalVolumePath = strings.Replace(canonicalVolumePath, filepath.Base(datastore), datastore, 1)
} }
return volumePath, nil return canonicalVolumePath, nil
} }
requestTime := time.Now() requestTime := time.Now()
volumePath, err = createVolumeInternal(volumeOptions) canonicalVolumePath, err = createVolumeInternal(volumeOptions)
vclib.RecordCreateVolumeMetric(volumeOptions, requestTime, err) vclib.RecordCreateVolumeMetric(volumeOptions, requestTime, err)
return volumePath, err glog.V(1).Infof("The canonical volume path for the newly created vSphere volume is %q", canonicalVolumePath)
return canonicalVolumePath, err
} }
// DeleteVolume deletes a volume given volume name. // DeleteVolume deletes a volume given volume name.

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"os" "os"
"regexp"
"runtime" "runtime"
"strings" "strings"
"time" "time"
@ -42,6 +43,7 @@ const (
DatastoreInfoProperty = "info" DatastoreInfoProperty = "info"
Folder = "Folder" Folder = "Folder"
VirtualMachine = "VirtualMachine" VirtualMachine = "VirtualMachine"
DummyDiskName = "kube-dummyDisk.vmdk"
) )
// GetVSphere reads vSphere configuration from system environment and construct vSphere object // GetVSphere reads vSphere configuration from system environment and construct vSphere object
@ -293,3 +295,61 @@ func (vs *VSphere) cleanUpDummyVMs(dummyVMPrefix string) {
} }
} }
} }
// Get canonical volume path for volume Path.
// Example1: The canonical path for volume path - [vsanDatastore] kubevols/volume.vmdk will be [vsanDatastore] 25d8b159-948c-4b73-e499-02001ad1b044/volume.vmdk
// Example2: The canonical path for volume path - [vsanDatastore] 25d8b159-948c-4b73-e499-02001ad1b044/volume.vmdk will be same as volume Path.
func getcanonicalVolumePath(ctx context.Context, dc *vclib.Datacenter, volumePath string) (string, error) {
var folderID string
var folderExists bool
canonicalVolumePath := volumePath
dsPathObj, err := vclib.GetDatastorePathObjFromVMDiskPath(volumePath)
if err != nil {
return "", err
}
dsPath := strings.Split(strings.TrimSpace(dsPathObj.Path), "/")
if len(dsPath) <= 1 {
return canonicalVolumePath, nil
}
datastore := dsPathObj.Datastore
dsFolder := dsPath[0]
folderNameIDMap, datastoreExists := datastoreFolderIDMap[datastore]
if datastoreExists {
folderID, folderExists = folderNameIDMap[dsFolder]
}
// Get the datastore folder ID if datastore or folder doesn't exist in datastoreFolderIDMap
if !datastoreExists || !folderExists {
if !vclib.IsValidUUID(dsFolder) {
dummyDiskVolPath := "[" + datastore + "] " + dsFolder + "/" + DummyDiskName
// Querying a non-existent dummy disk on the datastore folder.
// It would fail and return an folder ID in the error message.
_, err := dc.GetVirtualDiskPage83Data(ctx, dummyDiskVolPath)
if err != nil {
re := regexp.MustCompile("File (.*?) was not found")
match := re.FindStringSubmatch(err.Error())
canonicalVolumePath = match[1]
}
}
diskPath := vclib.GetPathFromVMDiskPath(canonicalVolumePath)
if diskPath == "" {
return "", fmt.Errorf("Failed to parse canonicalVolumePath: %s in getcanonicalVolumePath method", canonicalVolumePath)
}
folderID = strings.Split(strings.TrimSpace(diskPath), "/")[0]
setdatastoreFolderIDMap(datastoreFolderIDMap, datastore, dsFolder, folderID)
}
canonicalVolumePath = strings.Replace(volumePath, dsFolder, folderID, 1)
return canonicalVolumePath, nil
}
func setdatastoreFolderIDMap(
datastoreFolderIDMap map[string]map[string]string,
datastore string,
folderName string,
folderID string) {
folderNameIDMap := datastoreFolderIDMap[datastore]
if folderNameIDMap == nil {
folderNameIDMap = make(map[string]string)
datastoreFolderIDMap[datastore] = folderNameIDMap
}
folderNameIDMap[folderName] = folderID
}

View File

@ -86,34 +86,57 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No
} }
func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) { func (attacher *vsphereVMDKAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[*volume.Spec]bool) glog.Warningf("Attacher.VolumesAreAttached called for node %q - Please use BulkVerifyVolumes for vSphere", nodeName)
volumeSpecMap := make(map[string]*volume.Spec) volumeNodeMap := map[types.NodeName][]*volume.Spec{
volumePathList := []string{} nodeName: specs,
for _, spec := range specs { }
volumeSource, _, err := getVolumeSource(spec) nodeVolumesResult := make(map[*volume.Spec]bool)
nodesVerificationMap, err := attacher.BulkVerifyVolumes(volumeNodeMap)
if err != nil { if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", spec.Name(), err) glog.Errorf("Attacher.VolumesAreAttached - error checking volumes for node %q with %v", nodeName, err)
return nodeVolumesResult, err
}
if result, ok := nodesVerificationMap[nodeName]; ok {
return result, nil
}
return nodeVolumesResult, nil
}
func (attacher *vsphereVMDKAttacher) BulkVerifyVolumes(volumesByNode map[types.NodeName][]*volume.Spec) (map[types.NodeName]map[*volume.Spec]bool, error) {
volumesAttachedCheck := make(map[types.NodeName]map[*volume.Spec]bool)
volumePathsByNode := make(map[types.NodeName][]string)
volumeSpecMap := make(map[string]*volume.Spec)
for nodeName, volumeSpecs := range volumesByNode {
for _, volumeSpec := range volumeSpecs {
volumeSource, _, err := getVolumeSource(volumeSpec)
if err != nil {
glog.Errorf("Error getting volume (%q) source : %v", volumeSpec.Name(), err)
continue continue
} }
volumePathList = append(volumePathList, volumeSource.VolumePath) volPath := volumeSource.VolumePath
volumeSpecMap[volumeSource.VolumePath] = spec volumePathsByNode[nodeName] = append(volumePathsByNode[nodeName], volPath)
nodeVolume, nodeVolumeExists := volumesAttachedCheck[nodeName]
if !nodeVolumeExists {
nodeVolume = make(map[*volume.Spec]bool)
} }
attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathList, nodeName) nodeVolume[volumeSpec] = true
volumeSpecMap[volPath] = volumeSpec
volumesAttachedCheck[nodeName] = nodeVolume
}
}
attachedResult, err := attacher.vsphereVolumes.DisksAreAttached(volumePathsByNode)
if err != nil { if err != nil {
glog.Errorf( glog.Errorf("Error checking if volumes are attached to nodes: %+v. err: %v", volumePathsByNode, err)
"Error checking if volumes (%v) are attached to current node (%q). err=%v", return volumesAttachedCheck, err
volumePathList, nodeName, err)
return nil, err
} }
for volumePath, attached := range attachedResult { for nodeName, nodeVolumes := range attachedResult {
spec := volumeSpecMap[volumePath] for volumePath, attached := range nodeVolumes {
if !attached { if !attached {
volumesAttachedCheck[spec] = false spec := volumeSpecMap[volumePath]
glog.V(2).Infof("VolumesAreAttached: volume %q (specName: %q) is no longer attached", volumePath, spec.Name()) setNodeVolume(volumesAttachedCheck, spec, nodeName, false)
} else { }
volumesAttachedCheck[spec] = true
glog.V(2).Infof("VolumesAreAttached: volume %q (specName: %q) is attached", volumePath, spec.Name())
} }
} }
return volumesAttachedCheck, nil return volumesAttachedCheck, nil
@ -257,3 +280,17 @@ func (detacher *vsphereVMDKDetacher) Detach(deviceMountPath string, nodeName typ
func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error { func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error {
return volumeutil.UnmountPath(deviceMountPath, detacher.mounter) return volumeutil.UnmountPath(deviceMountPath, detacher.mounter)
} }
func setNodeVolume(
nodeVolumeMap map[types.NodeName]map[*volume.Spec]bool,
volumeSpec *volume.Spec,
nodeName types.NodeName,
check bool) {
volumeMap := nodeVolumeMap[nodeName]
if volumeMap == nil {
volumeMap = make(map[*volume.Spec]bool)
nodeVolumeMap[nodeName] = volumeMap
}
volumeMap[volumeSpec] = check
}

View File

@ -21,6 +21,7 @@ import (
"testing" "testing"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib" "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing" volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -308,7 +309,7 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam
return expected.isAttached, expected.ret return expected.isAttached, expected.ret
} }
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) { func (testcase *testcase) DisksAreAttached(nodeVolumes map[k8stypes.NodeName][]string) (map[k8stypes.NodeName]map[string]bool, error) {
return nil, errors.New("Not implemented") return nil, errors.New("Not implemented")
} }

View File

@ -85,7 +85,7 @@ func (plugin *vsphereVolumePlugin) SupportsMountOption() bool {
} }
func (plugin *vsphereVolumePlugin) SupportsBulkVolumeVerification() bool { func (plugin *vsphereVolumePlugin) SupportsBulkVolumeVerification() bool {
return false return true
} }
func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { func (plugin *vsphereVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {