diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index f794a28ccf6..9c5d4ad2d20 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -155,14 +155,18 @@ type Volumes interface { // Attach the disk to the specified instance // instanceName can be empty to mean "the instance on which we are running" // Returns the device (e.g. /dev/xvdf) where we attached the volume - AttachDisk(instanceName string, volumeName string, readOnly bool) (string, error) + AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) // Detach the disk from the specified instance // instanceName can be empty to mean "the instance on which we are running" - DetachDisk(instanceName string, volumeName string) error + // Returns the device where the volume was attached + DetachDisk(diskName string, instanceName string) (string, error) // Create a volume with the specified options - CreateVolume(volumeOptions *VolumeOptions) (volumeName string, err error) - DeleteVolume(volumeName string) error + CreateDisk(volumeOptions *VolumeOptions) (volumeName string, err error) + // Delete the specified volume + // Returns true iff the volume was deleted + // If the was not found, returns (false, nil) + DeleteDisk(volumeName string) (bool, error) // Get labels to apply to volume on creation GetVolumeLabels(volumeName string) (map[string]string, error) @@ -201,6 +205,8 @@ type AWSCloud struct { mutex sync.Mutex } +var _ Volumes = &AWSCloud{} + type AWSCloudConfig struct { Global struct { // TODO: Is there any use for this? We can get it from the instance metadata service @@ -901,7 +907,7 @@ func (self *awsInstance) getInfo() (*ec2.Instance, error) { // Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice. // If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true. // Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false. -func (self *awsInstance) getMountDevice(volumeID string) (assigned mountDevice, alreadyAttached bool, err error) { +func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) { instanceType := self.getInstanceType() if instanceType == nil { return "", false, fmt.Errorf("could not get instance type for instance: %s", self.awsID) @@ -939,11 +945,17 @@ func (self *awsInstance) getMountDevice(volumeID string) (assigned mountDevice, // Check to see if this volume is already assigned a device on this machine for mountDevice, mappingVolumeID := range self.deviceMappings { if volumeID == mappingVolumeID { - glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID) + if assign { + glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID) + } return mountDevice, true, nil } } + if !assign { + return mountDevice(""), false, nil + } + // Check all the valid mountpoints to see if any of them are free valid := instanceType.getEBSMountDevices() chosen := mountDevice("") @@ -1094,13 +1106,18 @@ func (self *awsDisk) waitForAttachmentStatus(status string) error { } // Deletes the EBS disk -func (self *awsDisk) deleteVolume() error { +func (self *awsDisk) deleteVolume() (bool, error) { request := &ec2.DeleteVolumeInput{VolumeId: aws.String(self.awsID)} _, err := self.ec2.DeleteVolume(request) if err != nil { - return fmt.Errorf("error delete EBS volumes: %v", err) + if awsError, ok := err.(awserr.Error); ok { + if awsError.Code() == "InvalidVolume.NotFound" { + return false, nil + } + } + return false, fmt.Errorf("error deleting EBS volumes: %v", err) } - return nil + return true, nil } // Gets the awsInstance for the EC2 instance on which we are running @@ -1155,7 +1172,7 @@ func (aws *AWSCloud) getAwsInstance(nodeName string) (*awsInstance, error) { } // Implements Volumes.AttachDisk -func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly bool) (string, error) { +func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) { disk, err := newAWSDisk(c, diskName) if err != nil { return "", err @@ -1172,7 +1189,7 @@ func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly boo return "", errors.New("AWS volumes cannot be mounted read-only") } - mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID) + mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true) if err != nil { return "", err } @@ -1220,15 +1237,25 @@ func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly boo } // Implements Volumes.DetachDisk -func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error { +func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, error) { disk, err := newAWSDisk(aws, diskName) if err != nil { - return err + return "", err } awsInstance, err := aws.getAwsInstance(instanceName) if err != nil { - return err + return "", err + } + + mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false) + if err != nil { + return "", err + } + + if !alreadyAttached { + glog.Warning("DetachDisk called on non-attached disk: ", diskName) + // TODO: Continue? Tolerate non-attached error in DetachVolume? } request := ec2.DetachVolumeInput{ @@ -1238,12 +1265,16 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error { response, err := aws.ec2.DetachVolume(&request) if err != nil { - return fmt.Errorf("error detaching EBS volume: %v", err) + return "", fmt.Errorf("error detaching EBS volume: %v", err) } if response == nil { - return errors.New("no response from DetachVolume") + return "", errors.New("no response from DetachVolume") } + // TODO: Fix this - just remove the cache? + // If we don't have a cache; we don't have to wait any more (the driver does it for us) + // Also, maybe we could get the locally connected drivers from the AWS metadata service? + // At this point we are waiting for the volume being detached. This // releases the volume and invalidates the cache even when there is a timeout. // @@ -1253,6 +1284,7 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error { // works though. An option would be to completely flush the cache upon timeouts. // defer func() { + // TODO: Not thread safe? for mountDevice, existingVolumeID := range awsInstance.deviceMappings { if existingVolumeID == disk.awsID { awsInstance.releaseMountDevice(disk.awsID, mountDevice) @@ -1263,14 +1295,15 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error { err = disk.waitForAttachmentStatus("detached") if err != nil { - return err + return "", err } - return err + hostDevicePath := "/dev/xvd" + string(mountDevice) + return hostDevicePath, err } // Implements Volumes.CreateVolume -func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) { +func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) { // TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?) request := &ec2.CreateVolumeInput{} @@ -1302,7 +1335,7 @@ func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) { tagRequest.Tags = tags if _, err := s.createTags(tagRequest); err != nil { // delete the volume and hope it succeeds - delerr := s.DeleteVolume(volumeName) + _, delerr := s.DeleteDisk(volumeName) if delerr != nil { // delete did not succeed, we have a stray volume! return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr) @@ -1313,11 +1346,11 @@ func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) { return volumeName, nil } -// Implements Volumes.DeleteVolume -func (aws *AWSCloud) DeleteVolume(volumeName string) error { +// Implements Volumes.DeleteDisk +func (aws *AWSCloud) DeleteDisk(volumeName string) (bool, error) { awsDisk, err := newAWSDisk(aws, volumeName) if err != nil { - return err + return false, err } return awsDisk.deleteVolume() } diff --git a/pkg/volume/aws_ebs/aws_ebs.go b/pkg/volume/aws_ebs/aws_ebs.go index 5df20b8d1fb..134f16c74e4 100644 --- a/pkg/volume/aws_ebs/aws_ebs.go +++ b/pkg/volume/aws_ebs/aws_ebs.go @@ -27,7 +27,6 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" - awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/mount" @@ -100,15 +99,15 @@ func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *volume.Spec, return &awsElasticBlockStoreBuilder{ awsElasticBlockStore: &awsElasticBlockStore{ - podUID: podUID, - volName: spec.Name(), - volumeID: volumeID, - manager: manager, - mounter: mounter, - plugin: plugin, + podUID: podUID, + volName: spec.Name(), + volumeID: volumeID, + partition: partition, + manager: manager, + mounter: mounter, + plugin: plugin, }, fsType: fsType, - partition: partition, readOnly: readOnly, diskMounter: &mount.SafeFormatAndMount{plugin.host.GetMounter(), exec.New()}}, nil } @@ -181,6 +180,8 @@ type awsElasticBlockStore struct { podUID types.UID // Unique id of the PD, used to find the disk resource in the provider. volumeID string + // Specifies the partition to mount + partition string // Utility interface that provides API calls to the provider to attach/detach disks. manager ebsManager // Mounter interface that provides system calls to mount the global path to the pod local path. @@ -196,22 +197,10 @@ func detachDiskLogError(ebs *awsElasticBlockStore) { } } -// getVolumeProvider returns the AWS Volumes interface -func (ebs *awsElasticBlockStore) getVolumeProvider() (awscloud.Volumes, error) { - cloud := ebs.plugin.host.GetCloudProvider() - volumes, ok := cloud.(awscloud.Volumes) - if !ok { - return nil, fmt.Errorf("Cloud provider does not support volumes") - } - return volumes, nil -} - type awsElasticBlockStoreBuilder struct { *awsElasticBlockStore // Filesystem type, optional. fsType string - // Specifies the partition to mount - partition string // Specifies whether the disk will be attached as read-only. readOnly bool // diskMounter provides the interface that is used to mount the actual block device. @@ -304,6 +293,7 @@ func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string { return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts", name) } +// Reverses the mapping done in makeGlobalPDPath func getVolumeIDFromGlobalMount(host volume.VolumeHost, globalPath string) (string, error) { basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts") rel, err := filepath.Rel(basePath, globalPath) diff --git a/pkg/volume/aws_ebs/aws_ebs_test.go b/pkg/volume/aws_ebs/aws_ebs_test.go index 313a1d2f8a6..8e81192b5b1 100644 --- a/pkg/volume/aws_ebs/aws_ebs_test.go +++ b/pkg/volume/aws_ebs/aws_ebs_test.go @@ -68,11 +68,12 @@ func TestGetAccessModes(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } + if !contains(plug.GetAccessModes(), api.ReadWriteOnce) { - t.Errorf("Expected to find AccessMode: %s", api.ReadWriteOnce) + t.Errorf("Expected to support AccessModeTypes: %s", api.ReadWriteOnce) } - if len(plug.GetAccessModes()) != 1 { - t.Errorf("Expected to find exactly one AccessMode") + if contains(plug.GetAccessModes(), api.ReadOnlyMany) { + t.Errorf("Expected not to support AccessModeTypes: %s", api.ReadOnlyMany) } } @@ -85,7 +86,10 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA return false } -type fakePDManager struct{} +type fakePDManager struct { + attachCalled bool + detachCalled bool +} // TODO(jonesdl) To fully test this, we could create a loopback device // and mount that instead. @@ -95,6 +99,10 @@ func (fake *fakePDManager) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, gl if err != nil { return err } + fake.attachCalled = true + // Simulate the global mount so that the fakeMounter returns the + // expected number of mounts for the attached disk. + b.mounter.Mount(globalPath, globalPath, b.fsType, nil) return nil } @@ -104,6 +112,7 @@ func (fake *fakePDManager) DetachDisk(c *awsElasticBlockStoreCleaner) error { if err != nil { return err } + fake.detachCalled = true return nil } @@ -121,7 +130,7 @@ func (fake *fakePDManager) DeleteVolume(cd *awsElasticBlockStoreDeleter) error { func TestPlugin(t *testing.T) { tmpDir, err := utiltesting.MkTmpdir("awsebsTest") if err != nil { - t.Fatalf("can't make a temp dir: %v") + t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) plugMgr := volume.VolumePluginMgr{} @@ -140,13 +149,16 @@ func TestPlugin(t *testing.T) { }, }, } - builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{}) + fakeManager := &fakePDManager{} + fakeMounter := &mount.FakeMounter{} + builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter) if err != nil { t.Errorf("Failed to make a new Builder: %v", err) } if builder == nil { t.Errorf("Got a nil Builder") } + volPath := path.Join(tmpDir, "pods/poduid/volumes/kubernetes.io~aws-ebs/vol1") path := builder.GetPath() if path != volPath { @@ -170,8 +182,12 @@ func TestPlugin(t *testing.T) { t.Errorf("SetUp() failed: %v", err) } } + if !fakeManager.attachCalled { + t.Errorf("Attach watch not called") + } - cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{}) + fakeManager = &fakePDManager{} + cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter) if err != nil { t.Errorf("Failed to make a new Cleaner: %v", err) } @@ -187,9 +203,12 @@ func TestPlugin(t *testing.T) { } else if !os.IsNotExist(err) { t.Errorf("SetUp() failed: %v", err) } + if !fakeManager.detachCalled { + t.Errorf("Detach watch not called") + } // Test Provisioner - cap := resource.MustParse("100Gi") + cap := resource.MustParse("100Mi") options := volume.VolumeOptions{ Capacity: cap, AccessModes: []api.PersistentVolumeAccessMode{ diff --git a/pkg/volume/aws_ebs/aws_util.go b/pkg/volume/aws_ebs/aws_util.go index 2f188c17070..083842db63f 100644 --- a/pkg/volume/aws_ebs/aws_util.go +++ b/pkg/volume/aws_ebs/aws_util.go @@ -17,47 +17,56 @@ limitations under the License. package aws_ebs import ( - "errors" + "fmt" "os" + "path/filepath" "time" "github.com/golang/glog" - aws_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + "k8s.io/kubernetes/pkg/util/keymutex" + "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/volume" ) +const ( + diskPartitionSuffix = "" + diskXVDPath = "/dev/xvd" + diskXVDPattern = "/dev/xvd*" + maxChecks = 60 + maxRetries = 10 + checkSleepDuration = time.Second + errorSleepDuration = 5 * time.Second +) + +// Singleton key mutex for keeping attach/detach operations for the same PD atomic +var attachDetachMutex = keymutex.NewKeyMutex() + type AWSDiskUtil struct{} -// Attaches a disk specified by a volume.AWSElasticBlockStore to the current kubelet. +// Attaches a disk to the current kubelet. // Mounts the disk to it's global path. -func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error { - volumes, err := b.getVolumeProvider() +func (diskUtil *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error { + glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath) + + // Block execution until any pending detach operations for this PD have completed + attachDetachMutex.LockKey(b.volumeID) + defer attachDetachMutex.UnlockKey(b.volumeID) + + glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath) + + xvdBefore, err := filepath.Glob(diskXVDPattern) + if err != nil { + glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskXVDPattern, err) + } + xvdBeforeSet := sets.NewString(xvdBefore...) + + devicePath, err := attachDiskAndVerify(b, xvdBeforeSet) if err != nil { return err } - devicePath, err := volumes.AttachDisk("", b.volumeID, b.readOnly) - if err != nil { - return err - } - if b.partition != "" { - devicePath = devicePath + b.partition - } - //TODO(jonesdl) There should probably be better method than busy-waiting here. - numTries := 0 - for { - _, err := os.Stat(devicePath) - if err == nil { - break - } - if err != nil && !os.IsNotExist(err) { - return err - } - numTries++ - if numTries == 10 { - return errors.New("Could not attach disk: Timeout after 10s (" + devicePath + ")") - } - time.Sleep(time.Second) - } // Only mount the PD globally once. notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath) @@ -87,64 +96,239 @@ func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, glob // Unmounts the device and detaches the disk from the kubelet's host machine. func (util *AWSDiskUtil) DetachDisk(c *awsElasticBlockStoreCleaner) error { - // Unmount the global PD mount, which should be the only one. - globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID) - if err := c.mounter.Unmount(globalPDPath); err != nil { - glog.V(2).Info("Error unmount dir ", globalPDPath, ": ", err) - return err - } - if err := os.Remove(globalPDPath); err != nil { - glog.V(2).Info("Error removing dir ", globalPDPath, ": ", err) - return err - } - // Detach the disk - volumes, err := c.getVolumeProvider() - if err != nil { - glog.V(2).Info("Error getting volume provider for volumeID ", c.volumeID, ": ", err) - return err - } - if err := volumes.DetachDisk("", c.volumeID); err != nil { - glog.V(2).Info("Error detaching disk ", c.volumeID, ": ", err) - return err + glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.volumeID) + + if err := unmountPDAndRemoveGlobalPath(c); err != nil { + glog.Errorf("Error unmounting PD %q: %v", c.volumeID, err) } + + // Detach disk asynchronously so that the kubelet sync loop is not blocked. + go detachDiskAndVerify(c) return nil } func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error { - volumes, err := d.getVolumeProvider() + cloud, err := getCloudProvider() if err != nil { - glog.V(2).Info("Error getting volume provider: ", err) return err } - if err := volumes.DeleteVolume(d.volumeID); err != nil { - glog.V(2).Infof("Error deleting AWS EBS volume %s: %v", d.volumeID, err) + deleted, err := cloud.DeleteDisk(d.volumeID) + if err != nil { + glog.V(2).Infof("Error deleting EBS Disk volume %s: %v", d.volumeID, err) return err } - glog.V(2).Infof("Successfully deleted AWS EBS volume %s", d.volumeID) + if deleted { + glog.V(2).Infof("Successfully deleted EBS Disk volume %s", d.volumeID) + } else { + glog.V(2).Infof("Successfully deleted EBS Disk volume %s (actually already deleted)", d.volumeID) + } return nil } func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, err error) { - volumes, err := c.getVolumeProvider() + cloud, err := getCloudProvider() if err != nil { - glog.V(2).Info("Error getting volume provider: ", err) return "", 0, err } requestBytes := c.options.Capacity.Value() - // AWS works with gigabytes, convert to GiB with rounding up - requestGB := int(volume.RoundUpSize(requestBytes, 1024*1024*1024)) - volSpec := &aws_cloud.VolumeOptions{ - CapacityGB: requestGB, - Tags: c.options.CloudTags, - } + // The cloud provider works with gigabytes, convert to GiB with rounding up + requestGB := volume.RoundUpSize(requestBytes, 1024*1024*1024) - name, err := volumes.CreateVolume(volSpec) + volumeOptions := &aws.VolumeOptions{} + volumeOptions.CapacityGB = int(requestGB) + + name, err := cloud.CreateDisk(volumeOptions) if err != nil { - glog.V(2).Infof("Error creating AWS EBS volume: %v", err) + glog.V(2).Infof("Error creating EBS Disk volume: %v", err) return "", 0, err } - glog.V(2).Infof("Successfully created AWS EBS volume %s", name) - return name, requestGB, nil + glog.V(2).Infof("Successfully created EBS Disk volume %s", name) + return name, int(requestGB), nil +} + +// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails. +func attachDiskAndVerify(b *awsElasticBlockStoreBuilder, xvdBeforeSet sets.String) (string, error) { + var awsCloud *aws.AWSCloud + for numRetries := 0; numRetries < maxRetries; numRetries++ { + var err error + if awsCloud == nil { + awsCloud, err = getCloudProvider() + if err != nil || awsCloud == nil { + // Retry on error. See issue #11321 + glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", b.volumeID, err) + time.Sleep(errorSleepDuration) + continue + } + } + + if numRetries > 0 { + glog.Warningf("Retrying attach for EBS Disk %q (retry count=%v).", b.volumeID, numRetries) + } + + devicePath, err := awsCloud.AttachDisk(b.volumeID, b.plugin.host.GetHostName(), b.readOnly) + if err != nil { + glog.Errorf("Error attaching PD %q: %v", b.volumeID, err) + time.Sleep(errorSleepDuration) + continue + } + + devicePaths := getDiskByIdPaths(b.awsElasticBlockStore, devicePath) + + for numChecks := 0; numChecks < maxChecks; numChecks++ { + path, err := verifyDevicePath(devicePaths) + if err != nil { + // Log error, if any, and continue checking periodically. See issue #11321 + glog.Errorf("Error verifying EBS Disk (%q) is attached: %v", b.volumeID, err) + } else if path != "" { + // A device path has successfully been created for the PD + glog.Infof("Successfully attached EBS Disk %q.", b.volumeID) + return path, nil + } + + // Sleep then check again + glog.V(3).Infof("Waiting for EBS Disk %q to attach.", b.volumeID) + time.Sleep(checkSleepDuration) + } + } + + return "", fmt.Errorf("Could not attach EBS Disk %q. Timeout waiting for mount paths to be created.", b.volumeID) +} + +// Returns the first path that exists, or empty string if none exist. +func verifyDevicePath(devicePaths []string) (string, error) { + for _, path := range devicePaths { + if pathExists, err := pathExists(path); err != nil { + return "", fmt.Errorf("Error checking if path exists: %v", err) + } else if pathExists { + return path, nil + } + } + + return "", nil +} + +// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails. +// This function is intended to be called asynchronously as a go routine. +func detachDiskAndVerify(c *awsElasticBlockStoreCleaner) { + glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.volumeID) + defer runtime.HandleCrash() + + // Block execution until any pending attach/detach operations for this PD have completed + attachDetachMutex.LockKey(c.volumeID) + defer attachDetachMutex.UnlockKey(c.volumeID) + + glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.volumeID) + + var awsCloud *aws.AWSCloud + for numRetries := 0; numRetries < maxRetries; numRetries++ { + var err error + if awsCloud == nil { + awsCloud, err = getCloudProvider() + if err != nil || awsCloud == nil { + // Retry on error. See issue #11321 + glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", c.volumeID, err) + time.Sleep(errorSleepDuration) + continue + } + } + + if numRetries > 0 { + glog.Warningf("Retrying detach for EBS Disk %q (retry count=%v).", c.volumeID, numRetries) + } + + devicePath, err := awsCloud.DetachDisk(c.volumeID, c.plugin.host.GetHostName()) + if err != nil { + glog.Errorf("Error detaching PD %q: %v", c.volumeID, err) + time.Sleep(errorSleepDuration) + continue + } + + devicePaths := getDiskByIdPaths(c.awsElasticBlockStore, devicePath) + + for numChecks := 0; numChecks < maxChecks; numChecks++ { + allPathsRemoved, err := verifyAllPathsRemoved(devicePaths) + if err != nil { + // Log error, if any, and continue checking periodically. + glog.Errorf("Error verifying EBS Disk (%q) is detached: %v", c.volumeID, err) + } else if allPathsRemoved { + // All paths to the PD have been successfully removed + unmountPDAndRemoveGlobalPath(c) + glog.Infof("Successfully detached EBS Disk %q.", c.volumeID) + return + } + + // Sleep then check again + glog.V(3).Infof("Waiting for EBS Disk %q to detach.", c.volumeID) + time.Sleep(checkSleepDuration) + } + + } + + glog.Errorf("Failed to detach EBS Disk %q. One or more mount paths was not removed.", c.volumeID) +} + +// Unmount the global PD mount, which should be the only one, and delete it. +func unmountPDAndRemoveGlobalPath(c *awsElasticBlockStoreCleaner) error { + globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID) + + err := c.mounter.Unmount(globalPDPath) + os.Remove(globalPDPath) + return err +} + +// Returns the first path that exists, or empty string if none exist. +func verifyAllPathsRemoved(devicePaths []string) (bool, error) { + allPathsRemoved := true + for _, path := range devicePaths { + if exists, err := pathExists(path); err != nil { + return false, fmt.Errorf("Error checking if path exists: %v", err) + } else { + allPathsRemoved = allPathsRemoved && !exists + } + } + + return allPathsRemoved, nil +} + +// Returns list of all paths for given EBS mount +// This is more interesting on GCE (where we are able to identify volumes under /dev/disk-by-id) +// Here it is mostly about applying the partition path +func getDiskByIdPaths(d *awsElasticBlockStore, devicePath string) []string { + devicePaths := []string{} + if devicePath != "" { + devicePaths = append(devicePaths, devicePath) + } + + if d.partition != "" { + for i, path := range devicePaths { + devicePaths[i] = path + diskPartitionSuffix + d.partition + } + } + + return devicePaths +} + +// Checks if the specified path exists +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } else if os.IsNotExist(err) { + return false, nil + } else { + return false, err + } +} + +// Return cloud provider +func getCloudProvider() (*aws.AWSCloud, error) { + awsCloudProvider, err := cloudprovider.GetCloudProvider("aws", nil) + if err != nil || awsCloudProvider == nil { + return nil, err + } + + // The conversion must be safe otherwise bug in GetCloudProvider() + return awsCloudProvider.(*aws.AWSCloud), nil } diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index 2a9e13e103f..36719ab92ee 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -262,7 +262,7 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) { // Log error, if any, and continue checking periodically. glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err) } else if allPathsRemoved { - // All paths to the PD have been succefully removed + // All paths to the PD have been successfully removed unmountPDAndRemoveGlobalPath(c) glog.Infof("Successfully detached GCE PD %q.", c.pdName) return diff --git a/plugin/pkg/admission/persistentvolume/label/admission_test.go b/plugin/pkg/admission/persistentvolume/label/admission_test.go index def17508284..934f3be78de 100644 --- a/plugin/pkg/admission/persistentvolume/label/admission_test.go +++ b/plugin/pkg/admission/persistentvolume/label/admission_test.go @@ -33,20 +33,20 @@ type mockVolumes struct { var _ aws.Volumes = &mockVolumes{} -func (v *mockVolumes) AttachDisk(instanceName string, volumeName string, readOnly bool) (string, error) { +func (v *mockVolumes) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) { return "", fmt.Errorf("not implemented") } -func (v *mockVolumes) DetachDisk(instanceName string, volumeName string) error { - return fmt.Errorf("not implemented") -} - -func (v *mockVolumes) CreateVolume(volumeOptions *aws.VolumeOptions) (volumeName string, err error) { +func (v *mockVolumes) DetachDisk(diskName string, instanceName string) (string, error) { return "", fmt.Errorf("not implemented") } -func (v *mockVolumes) DeleteVolume(volumeName string) error { - return fmt.Errorf("not implemented") +func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName string, err error) { + return "", fmt.Errorf("not implemented") +} + +func (v *mockVolumes) DeleteDisk(volumeName string) (bool, error) { + return false, fmt.Errorf("not implemented") } func (v *mockVolumes) GetVolumeLabels(volumeName string) (map[string]string, error) { diff --git a/test/e2e/pd.go b/test/e2e/pd.go index 2cd8cb4f0cc..42d63cb8eab 100644 --- a/test/e2e/pd.go +++ b/test/e2e/pd.go @@ -327,7 +327,7 @@ func createPD() (string, error) { } volumeOptions := &awscloud.VolumeOptions{} volumeOptions.CapacityGB = 10 - return volumes.CreateVolume(volumeOptions) + return volumes.CreateDisk(volumeOptions) } } @@ -354,7 +354,15 @@ func deletePD(pdName string) error { if !ok { return fmt.Errorf("Provider does not support volumes") } - return volumes.DeleteVolume(pdName) + deleted, err := volumes.DeleteDisk(pdName) + if err != nil { + return err + } else { + if !deleted { + Logf("Volume deletion implicitly succeeded because volume %q does not exist.", pdName) + } + return nil + } } } @@ -384,7 +392,8 @@ func detachPD(hostName, pdName string) error { if !ok { return fmt.Errorf("Provider does not support volumes") } - return volumes.DetachDisk(hostName, pdName) + _, err := volumes.DetachDisk(pdName, hostName) + return err } }