From e89ad04422fad2aa43ab0b2fe730877c81d53e11 Mon Sep 17 00:00:00 2001 From: Abrar Shivani Date: Thu, 4 Aug 2016 16:28:35 -0700 Subject: [PATCH] Implements Attacher Plugin Interface for vSphere --- cmd/kube-controller-manager/app/plugins.go | 1 + .../providers/vsphere/vsphere.go | 187 ++++++++++- pkg/volume/vsphere_volume/attacher.go | 243 ++++++++++++++ pkg/volume/vsphere_volume/attacher_test.go | 314 ++++++++++++++++++ pkg/volume/vsphere_volume/vsphere_volume.go | 100 +----- .../vsphere_volume/vsphere_volume_test.go | 41 --- .../vsphere_volume/vsphere_volume_util.go | 153 +++------ 7 files changed, 792 insertions(+), 247 deletions(-) create mode 100644 pkg/volume/vsphere_volume/attacher.go create mode 100644 pkg/volume/vsphere_volume/attacher_test.go diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 8ea23ecca2a..5cf8b188831 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -59,6 +59,7 @@ func ProbeAttachableVolumePlugins(config componentconfig.VolumeConfiguration) [] allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...) allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(config.FlexVolumePluginDir)...) + allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...) return allPlugins } diff --git a/pkg/cloudprovider/providers/vsphere/vsphere.go b/pkg/cloudprovider/providers/vsphere/vsphere.go index d6ae0135abf..a6655618d20 100644 --- a/pkg/cloudprovider/providers/vsphere/vsphere.go +++ b/pkg/cloudprovider/providers/vsphere/vsphere.go @@ -66,6 +66,7 @@ var supportedSCSIControllerType = []string{"lsilogic-sas", "pvscsi"} var ErrNoDiskUUIDFound = errors.New("No disk UUID found") var ErrNoDiskIDFound = errors.New("No vSphere disk ID found") var ErrNoDevicesFound = errors.New("No devices found") +var ErrNonSupportedControllerType = errors.New("Disk is attached to non-supported controller type") // VSphere is an implementation of cloud provider Interface for VSphere. type VSphere struct { @@ -104,6 +105,27 @@ type VSphereConfig struct { } } +type Volumes interface { + // AttachDisk attaches given disk to given node. Current node + // is used when nodeName is empty string. + AttachDisk(vmDiskPath string, nodeName string) (diskID string, diskUUID string, err error) + + // DetachDisk detaches given disk to given node. Current node + // is used when nodeName is empty string. + // Assumption: If node doesn't exist, disk is already detached from node. + DetachDisk(volPath string, nodeName string) error + + // DiskIsAttached checks if a disk is attached to the given node. + // Assumption: If node doesn't exist, disk is not attached to the node. + DiskIsAttached(volPath, nodeName string) (bool, error) + + // CreateVolume creates a new vmdk with specified parameters. + CreateVolume(name string, size int, tags *map[string]string) (volumePath string, err error) + + // DeleteVolume deletes vmdk. + DeleteVolume(vmDiskPath string) error +} + // Parses vSphere cloud config file and stores it into VSphereConfig. func readConfig(config io.Reader) (VSphereConfig, error) { if config == nil { @@ -582,11 +604,18 @@ func (vs *VSphere) AttachDisk(vmDiskPath string, nodeName string) (diskID string } // Get VM device list - vm, vmDevices, ds, _, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) + vm, vmDevices, ds, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) if err != nil { return "", "", err } + attached, _ := checkDiskAttached(vmDiskPath, vmDevices, dc, c) + if attached { + diskID, _ = getVirtualDiskID(vmDiskPath, vmDevices, dc, c) + diskUUID, _ = getVirtualDiskUUIDByPath(vmDiskPath, dc, c) + return diskID, diskUUID, nil + } + var diskControllerType = vs.cfg.Disk.SCSIControllerType // find SCSI controller of particular type from VM devices allSCSIControllers := getSCSIControllers(vmDevices) @@ -768,6 +797,107 @@ func getAvailableSCSIController(scsiControllers []*types.VirtualController) *typ return nil } +// DiskIsAttached returns if disk is attached to the VM using controllers supported by the plugin. +func (vs *VSphere) DiskIsAttached(volPath string, nodeName string) (bool, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create vSphere client + c, err := vsphereLogin(vs.cfg, ctx) + if err != nil { + glog.Errorf("Failed to create vSphere client. err: %s", err) + return false, err + } + defer c.Logout(ctx) + + // Find virtual machine to attach disk to + var vSphereInstance string + if nodeName == "" { + vSphereInstance = vs.localInstanceID + } else { + vSphereInstance = nodeName + } + + nodeExist, err := vs.NodeExists(c, vSphereInstance) + + if err != nil { + glog.Errorf("Failed to check whether node exist. err: %s.", err) + return false, err + } + + if !nodeExist { + glog.Warningf( + "Node %q does not exist. DiskIsAttached will assume vmdk %q is not attached to it.", + vSphereInstance, + volPath) + return false, nil + } + + // Get VM device list + _, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) + if err != nil { + glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err) + return false, err + } + + attached, err := checkDiskAttached(volPath, vmDevices, dc, c) + return attached, err +} + +func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) { + virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client) + if err != nil { + if err == ErrNoDevicesFound { + return false, nil + } + glog.Errorf("Failed to check whether disk is attached. err: %s", err) + return false, err + } + for _, controllerType := range supportedSCSIControllerType { + controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client) + if controllerkey == virtualDiskControllerKey { + return true, nil + } + } + return false, ErrNonSupportedControllerType + +} + +// Returns the object key that denotes the controller object to which vmdk is attached. +func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) { + volumeUUID, err := getVirtualDiskUUIDByPath(volPath, dc, client) + + if err != nil { + glog.Errorf("disk uuid not found for %v. err: %s", volPath, err) + return -1, err + } + + // filter vm devices to retrieve disk ID for the given vmdk file + for _, device := range vmDevices { + if vmDevices.TypeName(device) == "VirtualDisk" { + diskUUID, _ := getVirtualDiskUUID(device) + if diskUUID == volumeUUID { + return device.GetVirtualDevice().ControllerKey, nil + } + } + } + return -1, ErrNoDevicesFound +} + +// Returns key of the controller. +// Key is unique id that distinguishes one device from other devices in the same virtual machine. +func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) { + for _, device := range vmDevices { + devType := vmDevices.Type(device) + if devType == scsiType { + if c, ok := device.(types.BaseVirtualController); ok { + return c.GetVirtualController().Key, nil + } + } + } + return -1, ErrNoDevicesFound +} + // Returns formatted UUID for a virtual disk device. func getVirtualDiskUUID(newDevice types.BaseVirtualDevice) (string, error) { vd := newDevice.GetVirtualDevice() @@ -859,6 +989,21 @@ func (vs *VSphere) DetachDisk(volPath string, nodeName string) error { vSphereInstance = nodeName } + nodeExist, err := vs.NodeExists(c, vSphereInstance) + + if err != nil { + glog.Errorf("Failed to check whether node exist. err: %s.", err) + return err + } + + if !nodeExist { + glog.Warningf( + "Node %q does not exist. DetachDisk will assume vmdk %q is not attached to it.", + vSphereInstance, + volPath) + return nil + } + vm, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance) if err != nil { return err @@ -962,3 +1107,43 @@ func (vs *VSphere) DeleteVolume(vmDiskPath string) error { return task.Wait(ctx) } + +// NodeExists checks if the node with given nodeName exist. +// Returns false if VM doesn't exist or VM is in powerOff state. +func (vs *VSphere) NodeExists(c *govmomi.Client, nodeName string) (bool, error) { + + if nodeName == "" { + return false, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vm, err := getVirtualMachineByName(vs.cfg, ctx, c, nodeName) + if err != nil { + if _, ok := err.(*find.NotFoundError); ok { + return false, nil + } + glog.Errorf("Failed to get virtual machine object for node %+q. err %s", nodeName, err) + return false, err + } + + var mvm mo.VirtualMachine + err = getVirtualMachineManagedObjectReference(ctx, c, vm, "summary", &mvm) + if err != nil { + glog.Errorf("Failed to get virtual machine object reference for node %+q. err %s", nodeName, err) + return false, err + } + + if mvm.Summary.Runtime.PowerState == ActivePowerState { + return true, nil + } + + if mvm.Summary.Config.Template == false { + glog.Warningf("VM %s, is not in %s state", nodeName, ActivePowerState) + } else { + glog.Warningf("VM %s, is a template", nodeName) + } + + return false, nil +} diff --git a/pkg/volume/vsphere_volume/attacher.go b/pkg/volume/vsphere_volume/attacher.go new file mode 100644 index 00000000000..7bcc98844f8 --- /dev/null +++ b/pkg/volume/vsphere_volume/attacher.go @@ -0,0 +1,243 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere_volume + +import ( + "fmt" + "os" + "path" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/keymutex" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +type vsphereVMDKAttacher struct { + host volume.VolumeHost + vsphereVolumes vsphere.Volumes +} + +var _ volume.Attacher = &vsphereVMDKAttacher{} +var _ volume.AttachableVolumePlugin = &vsphereVolumePlugin{} + +// Singleton key mutex for keeping attach operations for the same host atomic +var attachdetachMutex = keymutex.NewKeyMutex() + +func (plugin *vsphereVolumePlugin) NewAttacher() (volume.Attacher, error) { + vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider()) + if err != nil { + return nil, err + } + + return &vsphereVMDKAttacher{ + host: plugin.host, + vsphereVolumes: vsphereCloud, + }, nil +} + +// Attaches the volume specified by the given spec to the given host. +// On success, returns the device path where the device was attached on the +// node. +// Callers are responsible for retryinging on failure. +// Callers are responsible for thread safety between concurrent attach and +// detach operations. +func (attacher *vsphereVMDKAttacher) Attach(spec *volume.Spec, hostName string) (string, error) { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + return "", err + } + + glog.V(4).Infof("vSphere: Attach disk called for host %s", hostName) + + // Keeps concurrent attach operations to same host atomic + attachdetachMutex.LockKey(hostName) + defer attachdetachMutex.UnlockKey(hostName) + + // vsphereCloud.AttachDisk checks if disk is already attached to host and + // succeeds in that case, so no need to do that separately. + _, diskUUID, err := attacher.vsphereVolumes.AttachDisk(volumeSource.VolumePath, hostName) + if err != nil { + glog.Errorf("Error attaching volume %q: %+v", volumeSource.VolumePath, err) + return "", err + } + + return path.Join(diskByIDPath, diskSCSIPrefix+diskUUID), nil +} + +func (attacher *vsphereVMDKAttacher) WaitForAttach(spec *volume.Spec, devicePath string, timeout time.Duration) (string, error) { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + return "", err + } + + if devicePath == "" { + return "", fmt.Errorf("WaitForAttach failed for VMDK %q: devicePath is empty.", volumeSource.VolumePath) + } + + ticker := time.NewTicker(checkSleepDuration) + defer ticker.Stop() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-ticker.C: + glog.V(5).Infof("Checking VMDK %q is attached", volumeSource.VolumePath) + path, err := verifyDevicePath(devicePath) + if err != nil { + // Log error, if any, and continue checking periodically. See issue #11321 + glog.Warningf("Error verifying VMDK (%q) is attached: %v", volumeSource.VolumePath, err) + } else if path != "" { + // A device path has successfully been created for the VMDK + glog.Infof("Successfully found attached VMDK %q.", volumeSource.VolumePath) + return path, nil + } + case <-timer.C: + return "", fmt.Errorf("Could not find attached VMDK %q. Timeout waiting for mount paths to be created.", volumeSource.VolumePath) + } + } +} + +// GetDeviceMountPath returns a path where the device should +// point which should be bind mounted for individual volumes. +func (attacher *vsphereVMDKAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) { + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + return "", err + } + + return makeGlobalPDPath(attacher.host, volumeSource.VolumePath), nil +} + +// GetMountDeviceRefs finds all other references to the device referenced +// by deviceMountPath; returns a list of paths. +func (plugin *vsphereVolumePlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) { + mounter := plugin.host.GetMounter() + return mount.GetMountRefs(mounter, deviceMountPath) +} + +// MountDevice mounts device to global mount point. +func (attacher *vsphereVMDKAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error { + mounter := attacher.host.GetMounter() + notMnt, err := mounter.IsLikelyNotMountPoint(deviceMountPath) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(deviceMountPath, 0750); err != nil { + glog.Errorf("Failed to create directory at %#v. err: %s", deviceMountPath, err) + return err + } + notMnt = true + } else { + return err + } + } + + volumeSource, _, err := getVolumeSource(spec) + if err != nil { + return err + } + + options := []string{} + + if notMnt { + diskMounter := &mount.SafeFormatAndMount{Interface: mounter, Runner: exec.New()} + err = diskMounter.FormatAndMount(devicePath, deviceMountPath, volumeSource.FSType, options) + if err != nil { + os.Remove(deviceMountPath) + return err + } + glog.V(4).Infof("formatting spec %v devicePath %v deviceMountPath %v fs %v with options %+v", spec.Name(), devicePath, deviceMountPath, volumeSource.FSType, options) + } + return nil +} + +type vsphereVMDKDetacher struct { + mounter mount.Interface + vsphereVolumes vsphere.Volumes +} + +var _ volume.Detacher = &vsphereVMDKDetacher{} + +func (plugin *vsphereVolumePlugin) NewDetacher() (volume.Detacher, error) { + vsphereCloud, err := getCloudProvider(plugin.host.GetCloudProvider()) + if err != nil { + return nil, err + } + + return &vsphereVMDKDetacher{ + mounter: plugin.host.GetMounter(), + vsphereVolumes: vsphereCloud, + }, nil +} + +// Detach the given device from the given host. +func (detacher *vsphereVMDKDetacher) Detach(deviceMountPath string, hostName string) error { + + volPath := getVolPathfromDeviceMountPath(deviceMountPath) + attached, err := detacher.vsphereVolumes.DiskIsAttached(volPath, hostName) + if err != nil { + // Log error and continue with detach + glog.Errorf( + "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v", + volPath, hostName, err) + } + + if err == nil && !attached { + // Volume is already detached from node. + glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volPath, hostName) + return nil + } + + attachdetachMutex.LockKey(hostName) + defer attachdetachMutex.UnlockKey(hostName) + if err := detacher.vsphereVolumes.DetachDisk(volPath, hostName); err != nil { + glog.Errorf("Error detaching volume %q: %v", volPath, err) + return err + } + return nil +} + +func (detacher *vsphereVMDKDetacher) WaitForDetach(devicePath string, timeout time.Duration) error { + ticker := time.NewTicker(checkSleepDuration) + defer ticker.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-ticker.C: + glog.V(5).Infof("Checking device %q is detached.", devicePath) + if pathExists, err := volumeutil.PathExists(devicePath); err != nil { + return fmt.Errorf("Error checking if device path exists: %v", err) + } else if !pathExists { + return nil + } + case <-timer.C: + return fmt.Errorf("Timeout reached; Device %v is still attached", devicePath) + } + } +} + +func (detacher *vsphereVMDKDetacher) UnmountDevice(deviceMountPath string) error { + return volumeutil.UnmountPath(deviceMountPath, detacher.mounter) +} diff --git a/pkg/volume/vsphere_volume/attacher_test.go b/pkg/volume/vsphere_volume/attacher_test.go new file mode 100644 index 00000000000..128b121989e --- /dev/null +++ b/pkg/volume/vsphere_volume/attacher_test.go @@ -0,0 +1,314 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere_volume + +import ( + "errors" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/volume" + volumetest "k8s.io/kubernetes/pkg/volume/testing" + + "github.com/golang/glog" +) + +func TestGetDeviceName_Volume(t *testing.T) { + plugin := newPlugin() + volPath := "[local] volumes/test" + spec := createVolSpec(volPath) + + deviceName, err := plugin.GetVolumeName(spec) + if err != nil { + t.Errorf("GetDeviceName error: %v", err) + } + if deviceName != volPath { + t.Errorf("GetDeviceName error: expected %s, got %s", volPath, deviceName) + } +} + +func TestGetDeviceName_PersistentVolume(t *testing.T) { + plugin := newPlugin() + volPath := "[local] volumes/test" + spec := createPVSpec(volPath) + + deviceName, err := plugin.GetVolumeName(spec) + if err != nil { + t.Errorf("GetDeviceName error: %v", err) + } + if deviceName != volPath { + t.Errorf("GetDeviceName error: expected %s, got %s", volPath, deviceName) + } +} + +// One testcase for TestAttachDetach table test below +type testcase struct { + name string + // For fake vSphere: + attach attachCall + detach detachCall + diskIsAttached diskIsAttachedCall + t *testing.T + + // Actual test to run + test func(test *testcase) (string, error) + // Expected return of the test + expectedDevice string + expectedError error +} + +func TestAttachDetach(t *testing.T) { + uuid := "00000000000000" + diskName := "[local] volumes/test" + hostName := "host" + spec := createVolSpec(diskName) + attachError := errors.New("Fake attach error") + detachError := errors.New("Fake detach error") + diskCheckError := errors.New("Fake DiskIsAttached error") + tests := []testcase{ + // Successful Attach call + { + name: "Attach_Positive", + attach: attachCall{diskName, hostName, uuid, nil}, + test: func(testcase *testcase) (string, error) { + attacher := newAttacher(testcase) + return attacher.Attach(spec, hostName) + }, + expectedDevice: "/dev/disk/by-id/wwn-0x" + uuid, + }, + + // Attach call fails + { + name: "Attach_Negative", + attach: attachCall{diskName, hostName, "", attachError}, + test: func(testcase *testcase) (string, error) { + attacher := newAttacher(testcase) + return attacher.Attach(spec, hostName) + }, + expectedError: attachError, + }, + + // Detach succeeds + { + name: "Detach_Positive", + diskIsAttached: diskIsAttachedCall{diskName, hostName, true, nil}, + detach: detachCall{diskName, hostName, nil}, + test: func(testcase *testcase) (string, error) { + detacher := newDetacher(testcase) + return "", detacher.Detach(diskName, hostName) + }, + }, + + // Disk is already detached + { + name: "Detach_Positive_AlreadyDetached", + diskIsAttached: diskIsAttachedCall{diskName, hostName, false, nil}, + test: func(testcase *testcase) (string, error) { + detacher := newDetacher(testcase) + return "", detacher.Detach(diskName, hostName) + }, + }, + + // Detach succeeds when DiskIsAttached fails + { + name: "Detach_Positive_CheckFails", + diskIsAttached: diskIsAttachedCall{diskName, hostName, false, diskCheckError}, + detach: detachCall{diskName, hostName, nil}, + test: func(testcase *testcase) (string, error) { + detacher := newDetacher(testcase) + return "", detacher.Detach(diskName, hostName) + }, + }, + + // Detach fails + { + name: "Detach_Negative", + diskIsAttached: diskIsAttachedCall{diskName, hostName, false, diskCheckError}, + detach: detachCall{diskName, hostName, detachError}, + test: func(testcase *testcase) (string, error) { + detacher := newDetacher(testcase) + return "", detacher.Detach(diskName, hostName) + }, + expectedError: detachError, + }, + } + + for _, testcase := range tests { + testcase.t = t + device, err := testcase.test(&testcase) + if err != testcase.expectedError { + t.Errorf("%s failed: expected err=%q, got %q", testcase.name, testcase.expectedError.Error(), err.Error()) + } + if device != testcase.expectedDevice { + t.Errorf("%s failed: expected device=%q, got %q", testcase.name, testcase.expectedDevice, device) + } + t.Logf("Test %q succeeded", testcase.name) + } +} + +// newPlugin creates a new vsphereVolumePlugin with fake cloud, NewAttacher +// and NewDetacher won't work. +func newPlugin() *vsphereVolumePlugin { + host := volumetest.NewFakeVolumeHost("/tmp", nil, nil, "") + plugins := ProbeVolumePlugins() + plugin := plugins[0] + plugin.Init(host) + return plugin.(*vsphereVolumePlugin) +} + +func newAttacher(testcase *testcase) *vsphereVMDKAttacher { + return &vsphereVMDKAttacher{ + host: nil, + vsphereVolumes: testcase, + } +} + +func newDetacher(testcase *testcase) *vsphereVMDKDetacher { + return &vsphereVMDKDetacher{ + vsphereVolumes: testcase, + } +} + +func createVolSpec(name string) *volume.Spec { + return &volume.Spec{ + Volume: &api.Volume{ + VolumeSource: api.VolumeSource{ + VsphereVolume: &api.VsphereVirtualDiskVolumeSource{ + VolumePath: name, + }, + }, + }, + } +} + +func createPVSpec(name string) *volume.Spec { + return &volume.Spec{ + PersistentVolume: &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + VsphereVolume: &api.VsphereVirtualDiskVolumeSource{ + VolumePath: name, + }, + }, + }, + }, + } +} + +// Fake vSphere implementation + +type attachCall struct { + diskName string + hostName string + retDeviceUUID string + ret error +} + +type detachCall struct { + diskName string + hostName string + ret error +} + +type diskIsAttachedCall struct { + diskName, hostName string + isAttached bool + ret error +} + +func (testcase *testcase) AttachDisk(diskName string, hostName string) (string, string, error) { + expected := &testcase.attach + + if expected.diskName == "" && expected.hostName == "" { + // testcase.attach looks uninitialized, test did not expect to call + // AttachDisk + testcase.t.Errorf("Unexpected AttachDisk call!") + return "", "", errors.New("Unexpected AttachDisk call!") + } + + if expected.diskName != diskName { + testcase.t.Errorf("Unexpected AttachDisk call: expected diskName %s, got %s", expected.diskName, diskName) + return "", "", errors.New("Unexpected AttachDisk call: wrong diskName") + } + + if expected.hostName != hostName { + testcase.t.Errorf("Unexpected AttachDisk call: expected hostName %s, got %s", expected.hostName, hostName) + return "", "", errors.New("Unexpected AttachDisk call: wrong hostName") + } + + glog.V(4).Infof("AttachDisk call: %s, %s, returning %q, %v", diskName, hostName, expected.retDeviceUUID, expected.ret) + + return "", expected.retDeviceUUID, expected.ret +} + +func (testcase *testcase) DetachDisk(diskName string, hostName string) error { + expected := &testcase.detach + + if expected.diskName == "" && expected.hostName == "" { + // testcase.detach looks uninitialized, test did not expect to call + // DetachDisk + testcase.t.Errorf("Unexpected DetachDisk call!") + return errors.New("Unexpected DetachDisk call!") + } + + if expected.diskName != diskName { + testcase.t.Errorf("Unexpected DetachDisk call: expected diskName %s, got %s", expected.diskName, diskName) + return errors.New("Unexpected DetachDisk call: wrong diskName") + } + + if expected.hostName != hostName { + testcase.t.Errorf("Unexpected DetachDisk call: expected hostname %s, got %s", expected.hostName, hostName) + return errors.New("Unexpected DetachDisk call: wrong hostname") + } + + glog.V(4).Infof("DetachDisk call: %s, %s, returning %v", diskName, hostName, expected.ret) + + return expected.ret +} + +func (testcase *testcase) DiskIsAttached(diskName, hostName string) (bool, error) { + expected := &testcase.diskIsAttached + + if expected.diskName == "" && expected.hostName == "" { + // testcase.diskIsAttached looks uninitialized, test did not expect to + // call DiskIsAttached + testcase.t.Errorf("Unexpected DiskIsAttached call!") + return false, errors.New("Unexpected DiskIsAttached call!") + } + + if expected.diskName != diskName { + testcase.t.Errorf("Unexpected DiskIsAttached call: expected diskName %s, got %s", expected.diskName, diskName) + return false, errors.New("Unexpected DiskIsAttached call: wrong diskName") + } + + if expected.hostName != hostName { + testcase.t.Errorf("Unexpected DiskIsAttached call: expected hostName %s, got %s", expected.hostName, hostName) + return false, errors.New("Unexpected DiskIsAttached call: wrong hostName") + } + + glog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v, %v", diskName, hostName, expected.isAttached, expected.ret) + + return expected.isAttached, expected.ret +} + +func (testcase *testcase) CreateVolume(name string, size int, tags *map[string]string) (volumePath string, err error) { + return "", errors.New("Not implemented") +} + +func (testcase *testcase) DeleteVolume(vmDiskPath string) error { + return errors.New("Not implemented") +} diff --git a/pkg/volume/vsphere_volume/vsphere_volume.go b/pkg/volume/vsphere_volume/vsphere_volume.go index 31b337d047f..e800eae0399 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume.go +++ b/pkg/volume/vsphere_volume/vsphere_volume.go @@ -17,16 +17,13 @@ limitations under the License. package vsphere_volume import ( - "errors" "fmt" "os" "path" - "strings" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/mount" @@ -121,20 +118,6 @@ func (plugin *vsphereVolumePlugin) newUnmounterInternal(volName string, podUID t }}, nil } -func (plugin *vsphereVolumePlugin) getCloudProvider() (*vsphere.VSphere, error) { - cloud := plugin.host.GetCloudProvider() - if cloud == nil { - glog.Errorf("Cloud provider not initialized properly") - return nil, errors.New("Cloud provider not initialized properly") - } - - vs := cloud.(*vsphere.VSphere) - if vs == nil { - return nil, errors.New("Invalid cloud provider: expected vSphere") - } - return vs, nil -} - func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { vsphereVolume := &api.Volume{ Name: volumeName, @@ -149,10 +132,6 @@ func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath str // Abstract interface to disk operations. type vdManager interface { - // Attaches the disk to the kubelet's host machine. - AttachDisk(mounter *vsphereVolumeMounter, globalPDPath string) error - // Detaches the disk from the kubelet's host machine. - DetachDisk(unmounter *vsphereVolumeUnmounter) error // Creates a volume CreateVolume(provisioner *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeGB int, err error) // Deletes a volume @@ -179,13 +158,6 @@ type vsphereVolume struct { volume.MetricsNil } -func detachDiskLogError(vv *vsphereVolume) { - err := vv.manager.DetachDisk(&vsphereVolumeUnmounter{vv}) - if err != nil { - glog.Warningf("Failed to detach disk: %v (%v)", vv, err) - } -} - var _ volume.Mounter = &vsphereVolumeMounter{} type vsphereVolumeMounter struct { @@ -219,23 +191,16 @@ func (b *vsphereVolumeMounter) SetUpAt(dir string, fsGroup *int64) error { glog.V(4).Infof("Something is already mounted to target %s", dir) return nil } - globalPDPath := makeGlobalPDPath(b.plugin.host, b.volPath) - if err := b.manager.AttachDisk(b, globalPDPath); err != nil { - glog.V(3).Infof("AttachDisk failed: %v", err) + + if err := os.MkdirAll(dir, 0750); err != nil { + glog.V(4).Infof("Could not create directory %s: %v", dir, err) return err } - glog.V(3).Infof("vSphere volume %s attached", b.volPath) options := []string{"bind"} - if err := os.MkdirAll(dir, 0750); err != nil { - // TODO: we should really eject the attach/detach out into its own control loop. - glog.V(4).Infof("Could not create directory %s: %v", dir, err) - detachDiskLogError(b.vsphereVolume) - return err - } - // Perform a bind mount to the full path to allow duplicate mounts of the same PD. + globalPDPath := makeGlobalPDPath(b.plugin.host, b.volPath) err = b.mounter.Mount(globalPDPath, dir, "", options) if err != nil { notmnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir) @@ -259,7 +224,6 @@ func (b *vsphereVolumeMounter) SetUpAt(dir string, fsGroup *int64) error { } } os.Remove(dir) - detachDiskLogError(b.vsphereVolume) return err } glog.V(3).Infof("vSphere volume %s mounted to %s", b.volPath, dir) @@ -283,69 +247,25 @@ func (v *vsphereVolumeUnmounter) TearDown() error { // resource was the last reference to that disk on the kubelet. func (v *vsphereVolumeUnmounter) TearDownAt(dir string) error { glog.V(5).Infof("vSphere Volume TearDown of %s", dir) - notmnt, err := v.mounter.IsLikelyNotMountPoint(dir) + notMnt, err := v.mounter.IsLikelyNotMountPoint(dir) if err != nil { - glog.V(4).Infof("Error checking if mountpoint ", dir, ": ", err) return err } - if notmnt { - glog.V(4).Infof("Not mount point,deleting") + if notMnt { return os.Remove(dir) } - - // Find vSphere volumeID to lock the right volume - refs, err := mount.GetMountRefs(v.mounter, dir) - if err != nil { - glog.V(4).Infof("Error getting mountrefs for ", dir, ": ", err) - return err - } - if len(refs) == 0 { - glog.V(4).Infof("Directory %s is not mounted", dir) - return fmt.Errorf("directory %s is not mounted", dir) - } - - mountPath := refs[0] - // Assumption: No file or folder is named starting with '[' in datastore - volumePath := mountPath[strings.LastIndex(mountPath, "["):] - // space between datastore and vmdk name in volumePath is encoded as '\040' when returned by GetMountRefs(). - // volumePath eg: "[local] xxx.vmdk" provided to attach/mount - // replacing \040 with space to match the actual volumePath - v.volPath = strings.Replace(volumePath, "\\040", " ", -1) - glog.V(4).Infof("Found volume %s mounted to %s", v.volPath, dir) - - // Reload list of references, there might be SetUpAt finished in the meantime - refs, err = mount.GetMountRefs(v.mounter, dir) - if err != nil { - glog.V(4).Infof("GetMountRefs failed: %v", err) - return err - } if err := v.mounter.Unmount(dir); err != nil { - glog.V(4).Infof("Unmount failed: %v", err) return err } - glog.V(3).Infof("Successfully unmounted: %s\n", dir) - - // If refCount is 1, then all bind mounts have been removed, and the - // remaining reference is the global mount. It is safe to detach. - if len(refs) == 1 { - if err := v.manager.DetachDisk(v); err != nil { - glog.V(4).Infof("DetachDisk failed: %v", err) - return err - } - glog.V(3).Infof("Volume %s detached", v.volPath) - } - notmnt, mntErr := v.mounter.IsLikelyNotMountPoint(dir) + notMnt, mntErr := v.mounter.IsLikelyNotMountPoint(dir) if mntErr != nil { glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr) return err } - if notmnt { - if err := os.Remove(dir); err != nil { - glog.V(4).Infof("Failed to remove directory after unmount: %v", err) - return err - } + if notMnt { + return os.Remove(dir) } - return nil + return fmt.Errorf("Failed to unmount volume dir") } func makeGlobalPDPath(host volume.VolumeHost, devName string) string { diff --git a/pkg/volume/vsphere_volume/vsphere_volume_test.go b/pkg/volume/vsphere_volume/vsphere_volume_test.go index eed3dacf0c3..aa7e888ae14 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume_test.go +++ b/pkg/volume/vsphere_volume/vsphere_volume_test.go @@ -58,46 +58,12 @@ func TestCanSupport(t *testing.T) { } type fakePDManager struct { - attachCalled bool - detachCalled bool } func getFakeDeviceName(host volume.VolumeHost, volPath string) string { return path.Join(host.GetPluginDir(vsphereVolumePluginName), "device", volPath) } -func (fake *fakePDManager) AttachDisk(b *vsphereVolumeMounter, globalPDPath string) error { - fakeDeviceName := getFakeDeviceName(b.plugin.host, b.volPath) - err := os.MkdirAll(fakeDeviceName, 0750) - if err != nil { - return err - } - fake.attachCalled = true - // Simulate the global mount so that the fakeMounter returns the - // expected number of mounts for the attached disk. - err = b.mounter.Mount(fakeDeviceName, globalPDPath, "", []string{"bind"}) - if err != nil { - return err - } - return nil -} - -func (fake *fakePDManager) DetachDisk(v *vsphereVolumeUnmounter) error { - globalPath := makeGlobalPDPath(v.plugin.host, v.volPath) - fakeDeviceName := getFakeDeviceName(v.plugin.host, v.volPath) - err := v.mounter.Unmount(globalPath) - if err != nil { - return err - } - // "Detach" the fake "device" - err = os.RemoveAll(fakeDeviceName) - if err != nil { - return err - } - fake.detachCalled = true - return nil -} - func (fake *fakePDManager) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) { return "[local] test-volume-name.vmdk", 100, nil } @@ -156,10 +122,6 @@ func TestPlugin(t *testing.T) { t.Errorf("Expected success, got: %v", err) } - if !fakeManager.attachCalled { - t.Errorf("Attach watch not called") - } - // Test Unmounter fakeManager = &fakePDManager{} unmounter, err := plug.(*vsphereVolumePlugin).newUnmounterInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter) @@ -178,9 +140,6 @@ func TestPlugin(t *testing.T) { } else if !os.IsNotExist(err) { t.Errorf("SetUp() failed: %v", err) } - if !fakeManager.detachCalled { - t.Errorf("Detach watch not called") - } // Test Provisioner cap := resource.MustParse("100Mi") diff --git a/pkg/volume/vsphere_volume/vsphere_volume_util.go b/pkg/volume/vsphere_volume/vsphere_volume_util.go index e0546ee6e20..6f4947e0991 100644 --- a/pkg/volume/vsphere_volume/vsphere_volume_util.go +++ b/pkg/volume/vsphere_volume/vsphere_volume_util.go @@ -18,140 +18,41 @@ package vsphere_volume import ( "errors" - "io/ioutil" - "os" - "path" + "fmt" "strings" "time" "github.com/golang/glog" - "k8s.io/kubernetes/pkg/util/keymutex" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere" "k8s.io/kubernetes/pkg/volume" + volumeutil "k8s.io/kubernetes/pkg/volume/util" ) const ( - maxRetries = 10 + maxRetries = 10 + checkSleepDuration = time.Second + diskByIDPath = "/dev/disk/by-id/" + diskSCSIPrefix = "wwn-0x" ) var ErrProbeVolume = errors.New("Error scanning attached volumes") -// Singleton key mutex for keeping attach/detach operations for the same PD atomic -var attachDetachMutex = keymutex.NewKeyMutex() - type VsphereDiskUtil struct{} -// Attaches a disk to the current kubelet. -// Mounts the disk to it's global path. -func (util *VsphereDiskUtil) AttachDisk(vm *vsphereVolumeMounter, globalPDPath string) error { - options := []string{} - - // Block execution until any pending attach/detach operations for this PD have completed - attachDetachMutex.LockKey(vm.volPath) - defer attachDetachMutex.UnlockKey(vm.volPath) - - cloud, err := vm.plugin.getCloudProvider() - if err != nil { - return err +func verifyDevicePath(path string) (string, error) { + if pathExists, err := volumeutil.PathExists(path); err != nil { + return "", fmt.Errorf("Error checking if path exists: %v", err) + } else if pathExists { + return path, nil } - diskID, diskUUID, attachError := cloud.AttachDisk(vm.volPath, "") - if attachError != nil { - return attachError - } else if diskUUID == "" { - return errors.New("Disk UUID has no value") - } - - // diskID for detach Disk - vm.diskID = diskID - - var devicePath string - numTries := 0 - for { - devicePath = verifyDevicePath(diskUUID) - - _, err := os.Stat(devicePath) - if err == nil { - break - } - if err != nil && !os.IsNotExist(err) { - return err - } - numTries++ - if numTries == maxRetries { - return errors.New("Could not attach disk: Timeout after 60s") - } - time.Sleep(time.Second * 60) - } - - notMnt, err := vm.mounter.IsLikelyNotMountPoint(globalPDPath) - if err != nil { - if os.IsNotExist(err) { - if err := os.MkdirAll(globalPDPath, 0750); err != nil { - return err - } - notMnt = true - } else { - return err - } - } - if notMnt { - err = vm.diskMounter.FormatAndMount(devicePath, globalPDPath, vm.fsType, options) - if err != nil { - os.Remove(globalPDPath) - return err - } - glog.V(2).Infof("Safe mount successful: %q\n", devicePath) - } - return nil -} - -func verifyDevicePath(diskUUID string) string { - files, _ := ioutil.ReadDir("/dev/disk/by-id/") - for _, f := range files { - // TODO: should support other controllers - if strings.Contains(f.Name(), "scsi-") { - devID := f.Name()[len("scsi-"):len(f.Name())] - if strings.Contains(devID, diskUUID) { - glog.V(4).Infof("Found disk attached as %q; full devicepath: %s\n", f.Name(), path.Join("/dev/disk/by-id/", f.Name())) - return path.Join("/dev/disk/by-id/", f.Name()) - } - } - } - glog.Warningf("Failed to find device for the diskid: %q\n", diskUUID) - return "" -} - -// Unmounts the device and detaches the disk from the kubelet's host machine. -func (util *VsphereDiskUtil) DetachDisk(vu *vsphereVolumeUnmounter) error { - - // Block execution until any pending attach/detach operations for this PD have completed - attachDetachMutex.LockKey(vu.volPath) - defer attachDetachMutex.UnlockKey(vu.volPath) - - globalPDPath := makeGlobalPDPath(vu.plugin.host, vu.volPath) - if err := vu.mounter.Unmount(globalPDPath); err != nil { - return err - } - if err := os.Remove(globalPDPath); err != nil { - return err - } - glog.V(2).Infof("Successfully unmounted main device: %s\n", globalPDPath) - - cloud, err := vu.plugin.getCloudProvider() - if err != nil { - return err - } - - if err = cloud.DetachDisk(vu.volPath, ""); err != nil { - return err - } - glog.V(2).Infof("Successfully detached vSphere volume %s", vu.volPath) - return nil + return "", nil } // CreateVolume creates a vSphere volume. func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPath string, volumeSizeKB int, err error) { - cloud, err := v.plugin.getCloudProvider() + cloud, err := getCloudProvider(v.plugin.host.GetCloudProvider()) if err != nil { return "", 0, err } @@ -171,7 +72,7 @@ func (util *VsphereDiskUtil) CreateVolume(v *vsphereVolumeProvisioner) (vmDiskPa // DeleteVolume deletes a vSphere volume. func (util *VsphereDiskUtil) DeleteVolume(vd *vsphereVolumeDeleter) error { - cloud, err := vd.plugin.getCloudProvider() + cloud, err := getCloudProvider(vd.plugin.host.GetCloudProvider()) if err != nil { return err } @@ -183,3 +84,25 @@ func (util *VsphereDiskUtil) DeleteVolume(vd *vsphereVolumeDeleter) error { glog.V(2).Infof("Successfully deleted vsphere volume %s", vd.volPath) return nil } + +func getVolPathfromDeviceMountPath(deviceMountPath string) string { + // Assumption: No file or folder is named starting with '[' in datastore + volPath := deviceMountPath[strings.LastIndex(deviceMountPath, "["):] + // space between datastore and vmdk name in volumePath is encoded as '\040' when returned by GetMountRefs(). + // volumePath eg: "[local] xxx.vmdk" provided to attach/mount + // replacing \040 with space to match the actual volumePath + return strings.Replace(volPath, "\\040", " ", -1) +} + +func getCloudProvider(cloud cloudprovider.Interface) (*vsphere.VSphere, error) { + if cloud == nil { + glog.Errorf("Cloud provider not initialized properly") + return nil, errors.New("Cloud provider not initialized properly") + } + + vs := cloud.(*vsphere.VSphere) + if vs == nil { + return nil, errors.New("Invalid cloud provider: expected vSphere") + } + return vs, nil +}