vSphere Cloud Provider code refactoring

This commit is contained in:
Balu Dontu 2017-08-02 08:05:23 -07:00
parent 210d61fb03
commit f4e39933f6
9 changed files with 470 additions and 1828 deletions

View File

@ -12,32 +12,24 @@ go_library(
name = "go_default_library",
srcs = [
"vsphere.go",
"vsphere_metrics.go",
"vsphere_util.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1/helper:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/vsphere/vclib:go_default_library",
"//pkg/cloudprovider/providers/vsphere/vclib/diskmanagers:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/vmware/govmomi:go_default_library",
"//vendor/github.com/vmware/govmomi/find:go_default_library",
"//vendor/github.com/vmware/govmomi/object:go_default_library",
"//vendor/github.com/vmware/govmomi/pbm:go_default_library",
"//vendor/github.com/vmware/govmomi/pbm/types:go_default_library",
"//vendor/github.com/vmware/govmomi/property:go_default_library",
"//vendor/github.com/vmware/govmomi/session:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25/mo:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25/soap:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25/types:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)
@ -48,6 +40,7 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/vsphere/vclib:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
@ -63,6 +56,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//pkg/cloudprovider/providers/vsphere/vclib:all-srcs",
],
tags = ["automanaged"],
)

File diff suppressed because it is too large Load Diff

View File

@ -1,127 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere
import (
"github.com/prometheus/client_golang/prometheus"
"time"
)
const (
api_createvolume = "CreateVolume"
api_deletevolume = "DeleteVolume"
api_attachvolume = "AttachVolume"
api_detachvolume = "DetachVolume"
)
const (
operation_deletevolume = "DeleteVolumeOperation"
operation_attachvolume = "AttachVolumeOperation"
operation_detachvolume = "DetachVolumeOperation"
operation_diskIsAttached = "DiskIsAttachedOperation"
operation_disksAreAttached = "DisksAreAttachedOperation"
operation_createvolume = "CreateVolumeOperation"
operation_createvolume_with_policy = "CreateVolumeWithPolicyOperation"
operation_createvolume_with_raw_vsan_policy = "CreateVolumeWithRawVSANPolicyOperation"
)
// vsphereApiMetric is for recording latency of Single API Call.
var vsphereApiMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "cloudprovider_vsphere_api_request_duration_seconds",
Help: "Latency of vsphere api call",
},
[]string{"request"},
)
var vsphereApiErrorMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_vsphere_api_request_errors",
Help: "vsphere Api errors",
},
[]string{"request"},
)
// vsphereOperationMetric is for recording latency of vSphere Operation which invokes multiple APIs to get the task done.
var vsphereOperationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "cloudprovider_vsphere_operation_duration_seconds",
Help: "Latency of vsphere operation call",
},
[]string{"operation"},
)
var vsphereOperationErrorMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_vsphere_operation_errors",
Help: "vsphere operation errors",
},
[]string{"operation"},
)
func registerMetrics() {
prometheus.MustRegister(vsphereApiMetric)
prometheus.MustRegister(vsphereApiErrorMetric)
prometheus.MustRegister(vsphereOperationMetric)
prometheus.MustRegister(vsphereOperationErrorMetric)
}
func recordvSphereMetric(actionName string, requestTime time.Time, err error) {
switch actionName {
case api_createvolume, api_deletevolume, api_attachvolume, api_detachvolume:
recordvSphereAPIMetric(actionName, requestTime, err)
default:
recordvSphereOperationMetric(actionName, requestTime, err)
}
}
func recordvSphereAPIMetric(actionName string, requestTime time.Time, err error) {
if err != nil {
vsphereApiErrorMetric.With(prometheus.Labels{"request": actionName}).Inc()
} else {
vsphereApiMetric.With(prometheus.Labels{"request": actionName}).Observe(calculateTimeTaken(requestTime))
}
}
func recordvSphereOperationMetric(actionName string, requestTime time.Time, err error) {
if err != nil {
vsphereOperationErrorMetric.With(prometheus.Labels{"operation": actionName}).Inc()
} else {
vsphereOperationMetric.With(prometheus.Labels{"operation": actionName}).Observe(calculateTimeTaken(requestTime))
}
}
func recordCreateVolumeMetric(volumeOptions *VolumeOptions, requestTime time.Time, err error) {
var actionName string
if volumeOptions.StoragePolicyName != "" {
actionName = operation_createvolume_with_policy
} else if volumeOptions.VSANStorageProfileData != "" {
actionName = operation_createvolume_with_raw_vsan_policy
} else {
actionName = operation_createvolume
}
recordvSphereMetric(actionName, requestTime, err)
}
func calculateTimeTaken(requestBeginTime time.Time) (timeTaken float64) {
if !requestBeginTime.IsZero() {
timeTaken = time.Since(requestBeginTime).Seconds()
} else {
timeTaken = 0
}
return timeTaken
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
)
func configFromEnv() (cfg VSphereConfig, ok bool) {
@ -125,11 +126,11 @@ func TestVSphereLogin(t *testing.T) {
defer cancel()
// Create vSphere client
err = vSphereLogin(ctx, vs)
err = vs.conn.Connect(ctx)
if err != nil {
t.Errorf("Failed to create vSpere client: %s", err)
t.Errorf("Failed to connect to vSphere: %s", err)
}
defer vs.client.Logout(ctx)
defer vs.conn.GoVmomiClient.Logout(ctx)
}
func TestZones(t *testing.T) {
@ -168,14 +169,14 @@ func TestInstances(t *testing.T) {
t.Fatalf("CurrentNodeName() failed: %s", err)
}
externalId, err := i.ExternalID(nodeName)
externalID, err := i.ExternalID(nodeName)
if err != nil {
t.Fatalf("Instances.ExternalID(%s) failed: %s", nodeName, err)
}
t.Logf("Found ExternalID(%s) = %s\n", nodeName, externalId)
t.Logf("Found ExternalID(%s) = %s\n", nodeName, externalID)
nonExistingVM := types.NodeName(rand.String(15))
externalId, err = i.ExternalID(nonExistingVM)
externalID, err = i.ExternalID(nonExistingVM)
if err == cloudprovider.InstanceNotFound {
t.Logf("VM %s was not found as expected\n", nonExistingVM)
} else if err == nil {
@ -184,13 +185,13 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances.ExternalID did not fail as expected, err: %v", err)
}
instanceId, err := i.InstanceID(nodeName)
instanceID, err := i.InstanceID(nodeName)
if err != nil {
t.Fatalf("Instances.InstanceID(%s) failed: %s", nodeName, err)
}
t.Logf("Found InstanceID(%s) = %s\n", nodeName, instanceId)
t.Logf("Found InstanceID(%s) = %s\n", nodeName, instanceID)
instanceId, err = i.InstanceID(nonExistingVM)
instanceID, err = i.InstanceID(nonExistingVM)
if err == cloudprovider.InstanceNotFound {
t.Logf("VM %s was not found as expected\n", nonExistingVM)
} else if err == nil {
@ -222,7 +223,7 @@ func TestVolumes(t *testing.T) {
t.Fatalf("CurrentNodeName() failed: %s", err)
}
volumeOptions := &VolumeOptions{
volumeOptions := &vclib.VolumeOptions{
CapacityKB: 1 * 1024 * 1024,
Tags: nil,
Name: "kubernetes-test-volume-" + rand.String(10),
@ -233,7 +234,7 @@ func TestVolumes(t *testing.T) {
t.Fatalf("Cannot create a new VMDK volume: %v", err)
}
_, _, err = vs.AttachDisk(volPath, "", "")
_, err = vs.AttachDisk(volPath, "", "")
if err != nil {
t.Fatalf("Cannot attach volume(%s) to VM(%s): %v", volPath, nodeName, err)
}
@ -249,36 +250,3 @@ func TestVolumes(t *testing.T) {
// t.Fatalf("Cannot delete VMDK volume %s: %v", volPath, err)
// }
}
func TestGetVMName(t *testing.T) {
cfg, ok := configFromEnv()
if !ok {
t.Skipf("No config found in environment")
}
// Create vSphere configuration object
vs, err := newVSphere(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
}
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create vSphere client
err = vSphereLogin(ctx, vs)
if err != nil {
t.Errorf("Failed to create vSpere client: %s", err)
}
defer vs.client.Logout(ctx)
// Get VM name
vmName, err := getVMName(vs.client, &cfg)
if err != nil {
t.Fatalf("Failed to get VM name: %s", err)
}
if vmName != "vmname" {
t.Errorf("Expect VM name 'vmname', got: %s", vmName)
}
}

View File

@ -18,21 +18,23 @@ package vsphere
import (
"context"
"errors"
"io/ioutil"
"os"
"runtime"
"strings"
"time"
"github.com/golang/glog"
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/mo"
"fmt"
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/pbm"
"github.com/vmware/govmomi/property"
"github.com/vmware/govmomi/vim25/mo"
"github.com/vmware/govmomi/vim25/types"
pbmtypes "github.com/vmware/govmomi/pbm/types"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib/diskmanagers"
)
const (
@ -42,15 +44,17 @@ const (
VirtualMachine = "VirtualMachine"
)
// Reads vSphere configuration from system environment and construct vSphere object
// GetVSphere reads vSphere configuration from system environment and construct vSphere object
func GetVSphere() (*VSphere, error) {
cfg := getVSphereConfig()
client, err := GetgovmomiClient(cfg)
vSphereConn := getVSphereConn(cfg)
client, err := GetgovmomiClient(vSphereConn)
if err != nil {
return nil, err
}
vSphereConn.GoVmomiClient = client
vs := &VSphere{
client: client,
conn: vSphereConn,
cfg: cfg,
localInstanceID: "",
}
@ -75,233 +79,217 @@ func getVSphereConfig() *VSphereConfig {
return &cfg
}
func GetgovmomiClient(cfg *VSphereConfig) (*govmomi.Client, error) {
if cfg == nil {
cfg = getVSphereConfig()
func getVSphereConn(cfg *VSphereConfig) *vclib.VSphereConnection {
vSphereConn := &vclib.VSphereConnection{
Username: cfg.Global.User,
Password: cfg.Global.Password,
Hostname: cfg.Global.VCenterIP,
Insecure: cfg.Global.InsecureFlag,
RoundTripperCount: cfg.Global.RoundTripperCount,
Port: cfg.Global.VCenterPort,
}
client, err := newClient(context.TODO(), cfg)
return vSphereConn
}
// GetgovmomiClient gets the goVMOMI client for the vsphere connection object
func GetgovmomiClient(conn *vclib.VSphereConnection) (*govmomi.Client, error) {
if conn == nil {
cfg := getVSphereConfig()
conn = getVSphereConn(cfg)
}
client, err := conn.NewClient(context.TODO())
return client, err
}
// Get placement compatibility result based on storage policy requirements.
func (vs *VSphere) GetPlacementCompatibilityResult(ctx context.Context, pbmClient *pbm.Client, storagePolicyID string) (pbm.PlacementCompatibilityResult, error) {
datastores, err := vs.getSharedDatastoresInK8SCluster(ctx)
// getvmUUID gets the BIOS UUID via the sys interface. This UUID is known by vsphere
func getvmUUID() (string, error) {
id, err := ioutil.ReadFile(UUIDPath)
if err != nil {
return nil, err
return "", fmt.Errorf("error retrieving vm uuid: %s", err)
}
var hubs []pbmtypes.PbmPlacementHub
for _, ds := range datastores {
hubs = append(hubs, pbmtypes.PbmPlacementHub{
HubType: ds.Type,
HubId: ds.Value,
})
uuidFromFile := string(id[:])
//strip leading and trailing white space and new line char
uuid := strings.TrimSpace(uuidFromFile)
// check the uuid starts with "VMware-"
if !strings.HasPrefix(uuid, UUIDPrefix) {
return "", fmt.Errorf("Failed to match Prefix, UUID read from the file is %v", uuidFromFile)
}
req := []pbmtypes.BasePbmPlacementRequirement{
&pbmtypes.PbmPlacementCapabilityProfileRequirement{
ProfileId: pbmtypes.PbmProfileId{
UniqueId: storagePolicyID,
},
},
// Strip the prefix and while spaces and -
uuid = strings.Replace(uuid[len(UUIDPrefix):(len(uuid))], " ", "", -1)
uuid = strings.Replace(uuid, "-", "", -1)
if len(uuid) != 32 {
return "", fmt.Errorf("Length check failed, UUID read from the file is %v", uuidFromFile)
}
res, err := pbmClient.CheckRequirements(ctx, hubs, nil, req)
if err != nil {
return nil, err
}
return res, nil
}
// Verify if the user specified datastore is in the list of non-compatible datastores.
// If yes, return the non compatible datastore reference.
func (vs *VSphere) IsUserSpecifiedDatastoreNonCompatible(ctx context.Context, compatibilityResult pbm.PlacementCompatibilityResult, dsName string) (bool, *types.ManagedObjectReference) {
dsMoList := vs.GetNonCompatibleDatastoresMo(ctx, compatibilityResult)
for _, ds := range dsMoList {
if ds.Info.GetDatastoreInfo().Name == dsName {
dsMoRef := ds.Reference()
return true, &dsMoRef
}
}
return false, nil
}
func GetNonCompatibleDatastoreFaultMsg(compatibilityResult pbm.PlacementCompatibilityResult, dsMoref types.ManagedObjectReference) string {
var faultMsg string
for _, res := range compatibilityResult {
if res.Hub.HubId == dsMoref.Value {
for _, err := range res.Error {
faultMsg = faultMsg + err.LocalizedMessage
}
}
}
return faultMsg
}
// Get the best fit compatible datastore by free space.
func GetMostFreeDatastore(dsMo []mo.Datastore) mo.Datastore {
var curMax int64
curMax = -1
var index int
for i, ds := range dsMo {
dsFreeSpace := ds.Info.GetDatastoreInfo().FreeSpace
if dsFreeSpace > curMax {
curMax = dsFreeSpace
index = i
}
}
return dsMo[index]
}
func (vs *VSphere) GetCompatibleDatastoresMo(ctx context.Context, compatibilityResult pbm.PlacementCompatibilityResult) ([]mo.Datastore, error) {
compatibleHubs := compatibilityResult.CompatibleDatastores()
// Return an error if there are no compatible datastores.
if len(compatibleHubs) < 1 {
return nil, fmt.Errorf("There are no compatible datastores that satisfy the storage policy requirements")
}
dsMoList, err := vs.getDatastoreMo(ctx, compatibleHubs)
if err != nil {
return nil, err
}
return dsMoList, nil
}
func (vs *VSphere) GetNonCompatibleDatastoresMo(ctx context.Context, compatibilityResult pbm.PlacementCompatibilityResult) []mo.Datastore {
nonCompatibleHubs := compatibilityResult.NonCompatibleDatastores()
// Return an error if there are no compatible datastores.
if len(nonCompatibleHubs) < 1 {
return nil
}
dsMoList, err := vs.getDatastoreMo(ctx, nonCompatibleHubs)
if err != nil {
return nil
}
return dsMoList
}
// Get the datastore managed objects for the place hubs using property collector.
func (vs *VSphere) getDatastoreMo(ctx context.Context, hubs []pbmtypes.PbmPlacementHub) ([]mo.Datastore, error) {
var dsMoRefs []types.ManagedObjectReference
for _, hub := range hubs {
dsMoRefs = append(dsMoRefs, types.ManagedObjectReference{
Type: hub.HubType,
Value: hub.HubId,
})
}
pc := property.DefaultCollector(vs.client.Client)
var dsMoList []mo.Datastore
err := pc.Retrieve(ctx, dsMoRefs, []string{DatastoreInfoProperty}, &dsMoList)
if err != nil {
return nil, err
}
return dsMoList, nil
// need to add dashes, e.g. "564d395e-d807-e18a-cb25-b79f65eb2b9f"
uuid = fmt.Sprintf("%s-%s-%s-%s-%s", uuid[0:8], uuid[8:12], uuid[12:16], uuid[16:20], uuid[20:32])
return uuid, nil
}
// Get all datastores accessible for the virtual machine object.
func (vs *VSphere) getSharedDatastoresInK8SCluster(ctx context.Context) ([]types.ManagedObjectReference, error) {
f := find.NewFinder(vs.client.Client, true)
dc, err := f.Datacenter(ctx, vs.cfg.Global.Datacenter)
f.SetDatacenter(dc)
vmFolder, err := f.Folder(ctx, strings.TrimSuffix(vs.cfg.Global.WorkingDir, "/"))
func getSharedDatastoresInK8SCluster(ctx context.Context, folder *vclib.Folder) ([]*vclib.Datastore, error) {
vmList, err := folder.GetVirtualMachines(ctx)
if err != nil {
glog.Errorf("Failed to get virtual machines in the kubernetes cluster: %s, err: %+v", folder.InventoryPath, err)
return nil, err
}
vmMoList, err := vs.GetVMsInsideFolder(ctx, vmFolder, []string{NameProperty})
if err != nil {
return nil, err
if vmList == nil || len(vmList) == 0 {
glog.Errorf("No virtual machines found in the kubernetes cluster: %s", folder.InventoryPath)
return nil, fmt.Errorf("No virtual machines found in the kubernetes cluster: %s", folder.InventoryPath)
}
index := 0
var sharedDs []string
for _, vmMo := range vmMoList {
if !strings.HasPrefix(vmMo.Name, DummyVMPrefixName) {
accessibleDatastores, err := vs.getAllAccessibleDatastores(ctx, vmMo)
var sharedDatastores []*vclib.Datastore
for _, vm := range vmList {
vmName, err := vm.ObjectName(ctx)
if err != nil {
return nil, err
}
if !strings.HasPrefix(vmName, DummyVMPrefixName) {
accessibleDatastores, err := vm.GetAllAccessibleDatastores(ctx)
if err != nil {
return nil, err
}
if index == 0 {
sharedDs = accessibleDatastores
sharedDatastores = accessibleDatastores
} else {
sharedDs = intersect(sharedDs, accessibleDatastores)
if len(sharedDs) == 0 {
return nil, fmt.Errorf("No shared datastores found in the Kubernetes cluster")
sharedDatastores = intersect(sharedDatastores, accessibleDatastores)
if len(sharedDatastores) == 0 {
return nil, fmt.Errorf("No shared datastores found in the Kubernetes cluster: %s", folder.InventoryPath)
}
}
index++
}
}
var sharedDSMorefs []types.ManagedObjectReference
for _, ds := range sharedDs {
sharedDSMorefs = append(sharedDSMorefs, types.ManagedObjectReference{
Value: ds,
Type: "Datastore",
})
}
return sharedDSMorefs, nil
return sharedDatastores, nil
}
func intersect(list1 []string, list2 []string) []string {
var sharedList []string
func intersect(list1 []*vclib.Datastore, list2 []*vclib.Datastore) []*vclib.Datastore {
var sharedDs []*vclib.Datastore
for _, val1 := range list1 {
// Check if val1 is found in list2
for _, val2 := range list2 {
if val1 == val2 {
sharedList = append(sharedList, val1)
if val1.Reference().Value == val2.Reference().Value {
sharedDs = append(sharedDs, val1)
break
}
}
}
return sharedList
}
// Get the VM list inside a folder.
func (vs *VSphere) GetVMsInsideFolder(ctx context.Context, vmFolder *object.Folder, properties []string) ([]mo.VirtualMachine, error) {
vmFolders, err := vmFolder.Children(ctx)
if err != nil {
return nil, err
}
pc := property.DefaultCollector(vs.client.Client)
var vmRefs []types.ManagedObjectReference
var vmMoList []mo.VirtualMachine
for _, vmFolder := range vmFolders {
if vmFolder.Reference().Type == VirtualMachine {
vmRefs = append(vmRefs, vmFolder.Reference())
}
}
err = pc.Retrieve(ctx, vmRefs, properties, &vmMoList)
if err != nil {
return nil, err
}
return vmMoList, nil
return sharedDs
}
// Get the datastores accessible for the virtual machine object.
func (vs *VSphere) getAllAccessibleDatastores(ctx context.Context, vmMo mo.VirtualMachine) ([]string, error) {
f := find.NewFinder(vs.client.Client, true)
dc, err := f.Datacenter(ctx, vs.cfg.Global.Datacenter)
if err != nil {
return nil, err
func getAllAccessibleDatastores(ctx context.Context, client *vim25.Client, vmMo mo.VirtualMachine) ([]string, error) {
host := vmMo.Summary.Runtime.Host
if host == nil {
return nil, errors.New("VM doesn't have a HostSystem")
}
f.SetDatacenter(dc)
vmRegex := vs.cfg.Global.WorkingDir + vmMo.Name
vmObj, err := f.VirtualMachine(ctx, vmRegex)
if err != nil {
return nil, err
}
host, err := vmObj.HostSystem(ctx)
if err != nil {
return nil, err
}
var hostSystemMo mo.HostSystem
s := object.NewSearchIndex(vs.client.Client)
err = s.Properties(ctx, host.Reference(), []string{DatastoreProperty}, &hostSystemMo)
s := object.NewSearchIndex(client)
err := s.Properties(ctx, host.Reference(), []string{DatastoreProperty}, &hostSystemMo)
if err != nil {
return nil, err
}
var dsRefValues []string
for _, dsRef := range hostSystemMo.Datastore {
dsRefValues = append(dsRefValues, dsRef.Value)
}
return dsRefValues, nil
}
// getMostFreeDatastore gets the best fit compatible datastore by free space.
func getMostFreeDatastoreName(ctx context.Context, client *vim25.Client, dsObjList []*vclib.Datastore) (string, error) {
dsMoList, err := dsObjList[0].Datacenter.GetDatastoreMoList(ctx, dsObjList, []string{DatastoreInfoProperty})
if err != nil {
return "", err
}
var curMax int64
curMax = -1
var index int
for i, dsMo := range dsMoList {
dsFreeSpace := dsMo.Info.GetDatastoreInfo().FreeSpace
if dsFreeSpace > curMax {
curMax = dsFreeSpace
index = i
}
}
return dsMoList[index].Info.GetDatastoreInfo().Name, nil
}
func getPbmCompatibleDatastore(ctx context.Context, client *vim25.Client, storagePolicyName string, folder *vclib.Folder) (string, error) {
pbmClient, err := vclib.NewPbmClient(ctx, client)
if err != nil {
return "", err
}
storagePolicyID, err := pbmClient.ProfileIDByName(ctx, storagePolicyName)
if err != nil {
glog.Errorf("Failed to get Profile ID by name: %s. err: %+v", storagePolicyName, err)
return "", err
}
sharedDsList, err := getSharedDatastoresInK8SCluster(ctx, folder)
if err != nil {
glog.Errorf("Failed to get shared datastores from kubernetes cluster: %s. err: %+v", folder.InventoryPath, err)
return "", err
}
compatibleDatastores, _, err := pbmClient.GetCompatibleDatastores(ctx, storagePolicyID, sharedDsList)
if err != nil {
glog.Errorf("Failed to get compatible datastores from datastores : %+v with storagePolicy: %s. err: %+v", sharedDsList, storagePolicyID, err)
return "", err
}
datastore, err := getMostFreeDatastoreName(ctx, client, compatibleDatastores)
if err != nil {
glog.Errorf("Failed to get most free datastore from compatible datastores: %+v. err: %+v", compatibleDatastores, err)
return "", err
}
return datastore, err
}
func (vs *VSphere) setVMOptions(ctx context.Context, dc *vclib.Datacenter) (*vclib.VMOptions, error) {
var vmOptions vclib.VMOptions
vm, err := dc.GetVMByPath(ctx, vs.cfg.Global.WorkingDir+"/"+vs.localInstanceID)
if err != nil {
return nil, err
}
resourcePool, err := vm.GetResourcePool(ctx)
if err != nil {
return nil, err
}
folder, err := dc.GetFolderByPath(ctx, vs.cfg.Global.WorkingDir)
if err != nil {
return nil, err
}
vmOptions.VMFolder = folder
vmOptions.VMResourcePool = resourcePool
return &vmOptions, nil
}
// A background routine which will be responsible for deleting stale dummy VM's.
func (vs *VSphere) cleanUpDummyVMs(dummyVMPrefix string) {
// Create context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
time.Sleep(CleanUpDummyVMRoutineInterval * time.Minute)
// Ensure client is logged in and session is valid
err := vs.conn.Connect(ctx)
if err != nil {
glog.V(4).Infof("Failed to connect to VC with err: %+v. Retrying again...", err)
continue
}
dc, err := vclib.GetDatacenter(ctx, vs.conn, vs.cfg.Global.Datacenter)
if err != nil {
glog.V(4).Infof("Failed to get the datacenter: %s from VC. err: %+v", vs.cfg.Global.Datacenter, err)
continue
}
// Get the folder reference for global working directory where the dummy VM needs to be created.
vmFolder, err := dc.GetFolderByPath(ctx, vs.cfg.Global.WorkingDir)
if err != nil {
glog.V(4).Infof("Unable to get the kubernetes folder: %q reference. err: %+v", vs.cfg.Global.WorkingDir, err)
continue
}
// A write lock is acquired to make sure the cleanUp routine doesn't delete any VM's created by ongoing PVC requests.
defer cleanUpDummyVMLock.Lock()
err = diskmanagers.CleanUpDummyVMs(ctx, vmFolder, dc)
if err != nil {
glog.V(4).Infof("Unable to clean up dummy VM's in the kubernetes cluster: %q. err: %+v", vs.cfg.Global.WorkingDir, err)
}
}
}

View File

@ -19,6 +19,7 @@ go_library(
deps = [
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/vsphere:go_default_library",
"//pkg/cloudprovider/providers/vsphere/vclib:go_default_library",
"//pkg/util/keymutex:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
@ -43,7 +44,7 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/cloudprovider/providers/vsphere:go_default_library",
"//pkg/cloudprovider/providers/vsphere/vclib:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",

View File

@ -75,7 +75,7 @@ func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, nodeName types.No
// vsphereCloud.AttachDisk checks if disk is already attached to host and
// succeeds in that case, so no need to do that separately.
_, diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyID, nodeName)
diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, volumeSource.StoragePolicyID, nodeName)
if err != nil {
glog.Errorf("Error attaching volume %q to node %q: %+v", volumeSource.VolumePath, nodeName, err)
return "", err

View File

@ -21,7 +21,7 @@ import (
"testing"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -233,29 +233,29 @@ type diskIsAttachedCall struct {
ret error
}
func (testcase *testcase) AttachDisk(diskName string, storagePolicyName string, nodeName types.NodeName) (string, string, error) {
func (testcase *testcase) AttachDisk(diskName string, storagePolicyName string, nodeName types.NodeName) (string, error) {
expected := &testcase.attach
if expected.diskName == "" && expected.nodeName == "" {
// testcase.attach looks uninitialized, test did not expect to call
// AttachDisk
testcase.t.Errorf("Unexpected AttachDisk call!")
return "", "", errors.New("Unexpected AttachDisk call!")
return "", errors.New("Unexpected AttachDisk call!")
}
if expected.diskName != diskName {
testcase.t.Errorf("Unexpected AttachDisk call: expected diskName %s, got %s", expected.diskName, diskName)
return "", "", errors.New("Unexpected AttachDisk call: wrong diskName")
return "", errors.New("Unexpected AttachDisk call: wrong diskName")
}
if expected.nodeName != nodeName {
testcase.t.Errorf("Unexpected AttachDisk call: expected nodeName %s, got %s", expected.nodeName, nodeName)
return "", "", errors.New("Unexpected AttachDisk call: wrong nodeName")
return "", errors.New("Unexpected AttachDisk call: wrong nodeName")
}
glog.V(4).Infof("AttachDisk call: %s, %s, returning %q, %v", diskName, nodeName, expected.retDeviceUUID, expected.ret)
return "", expected.retDeviceUUID, expected.ret
return expected.retDeviceUUID, expected.ret
}
func (testcase *testcase) DetachDisk(diskName string, nodeName types.NodeName) error {
@ -312,7 +312,7 @@ func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.No
return nil, errors.New("Not implemented")
}
func (testcase *testcase) CreateVolume(volumeOptions *vsphere.VolumeOptions) (volumePath string, err error) {
func (testcase *testcase) CreateVolume(volumeOptions *vclib.VolumeOptions) (volumePath string, err error) {
return "", errors.New("Not implemented")
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
"k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere/vclib"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -94,7 +95,7 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec
// vSphere works with kilobytes, convert to KiB with rounding up
volSizeKB := int(volume.RoundUpSize(volSizeBytes, 1024))
name := volume.GenerateVolumeName(v.options.ClusterName, v.options.PVName, 255)
volumeOptions := &vsphere.VolumeOptions{
volumeOptions := &vclib.VolumeOptions{
CapacityKB: volSizeKB,
Tags: *v.options.CloudTags,
Name: name,
@ -129,7 +130,7 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec
if volumeOptions.VSANStorageProfileData != "" {
if volumeOptions.StoragePolicyName != "" {
return nil, fmt.Errorf("Cannot specify storage policy capabilities along with storage policy name. Please specify only one.")
return nil, fmt.Errorf("Cannot specify storage policy capabilities along with storage policy name. Please specify only one")
}
volumeOptions.VSANStorageProfileData = "(" + volumeOptions.VSANStorageProfileData + ")"
}
@ -141,7 +142,6 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (volSpec
vmDiskPath, err := cloud.CreateVolume(volumeOptions)
if err != nil {
glog.V(2).Infof("Error creating vsphere volume: %v", err)
return nil, err
}
volSpec = &VolumeSpec{