mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #15938 from justinsb/aws_ebs_cleanup
Auto commit by PR queue bot
This commit is contained in:
commit
fec0d127b3
@ -155,14 +155,18 @@ type Volumes interface {
|
||||
// Attach the disk to the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
// Returns the device (e.g. /dev/xvdf) where we attached the volume
|
||||
AttachDisk(instanceName string, volumeName string, readOnly bool) (string, error)
|
||||
AttachDisk(diskName string, instanceName string, readOnly bool) (string, error)
|
||||
// Detach the disk from the specified instance
|
||||
// instanceName can be empty to mean "the instance on which we are running"
|
||||
DetachDisk(instanceName string, volumeName string) error
|
||||
// Returns the device where the volume was attached
|
||||
DetachDisk(diskName string, instanceName string) (string, error)
|
||||
|
||||
// Create a volume with the specified options
|
||||
CreateVolume(volumeOptions *VolumeOptions) (volumeName string, err error)
|
||||
DeleteVolume(volumeName string) error
|
||||
CreateDisk(volumeOptions *VolumeOptions) (volumeName string, err error)
|
||||
// Delete the specified volume
|
||||
// Returns true iff the volume was deleted
|
||||
// If the was not found, returns (false, nil)
|
||||
DeleteDisk(volumeName string) (bool, error)
|
||||
|
||||
// Get labels to apply to volume on creation
|
||||
GetVolumeLabels(volumeName string) (map[string]string, error)
|
||||
@ -201,6 +205,8 @@ type AWSCloud struct {
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
var _ Volumes = &AWSCloud{}
|
||||
|
||||
type AWSCloudConfig struct {
|
||||
Global struct {
|
||||
// TODO: Is there any use for this? We can get it from the instance metadata service
|
||||
@ -901,7 +907,7 @@ func (self *awsInstance) getInfo() (*ec2.Instance, error) {
|
||||
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
|
||||
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
|
||||
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
|
||||
func (self *awsInstance) getMountDevice(volumeID string) (assigned mountDevice, alreadyAttached bool, err error) {
|
||||
func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
|
||||
instanceType := self.getInstanceType()
|
||||
if instanceType == nil {
|
||||
return "", false, fmt.Errorf("could not get instance type for instance: %s", self.awsID)
|
||||
@ -939,11 +945,17 @@ func (self *awsInstance) getMountDevice(volumeID string) (assigned mountDevice,
|
||||
// Check to see if this volume is already assigned a device on this machine
|
||||
for mountDevice, mappingVolumeID := range self.deviceMappings {
|
||||
if volumeID == mappingVolumeID {
|
||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID)
|
||||
if assign {
|
||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID)
|
||||
}
|
||||
return mountDevice, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
if !assign {
|
||||
return mountDevice(""), false, nil
|
||||
}
|
||||
|
||||
// Check all the valid mountpoints to see if any of them are free
|
||||
valid := instanceType.getEBSMountDevices()
|
||||
chosen := mountDevice("")
|
||||
@ -1094,13 +1106,18 @@ func (self *awsDisk) waitForAttachmentStatus(status string) error {
|
||||
}
|
||||
|
||||
// Deletes the EBS disk
|
||||
func (self *awsDisk) deleteVolume() error {
|
||||
func (self *awsDisk) deleteVolume() (bool, error) {
|
||||
request := &ec2.DeleteVolumeInput{VolumeId: aws.String(self.awsID)}
|
||||
_, err := self.ec2.DeleteVolume(request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error delete EBS volumes: %v", err)
|
||||
if awsError, ok := err.(awserr.Error); ok {
|
||||
if awsError.Code() == "InvalidVolume.NotFound" {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return false, fmt.Errorf("error deleting EBS volumes: %v", err)
|
||||
}
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Gets the awsInstance for the EC2 instance on which we are running
|
||||
@ -1155,7 +1172,7 @@ func (aws *AWSCloud) getAwsInstance(nodeName string) (*awsInstance, error) {
|
||||
}
|
||||
|
||||
// Implements Volumes.AttachDisk
|
||||
func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly bool) (string, error) {
|
||||
func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) {
|
||||
disk, err := newAWSDisk(c, diskName)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -1172,7 +1189,7 @@ func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly boo
|
||||
return "", errors.New("AWS volumes cannot be mounted read-only")
|
||||
}
|
||||
|
||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID)
|
||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -1220,15 +1237,25 @@ func (c *AWSCloud) AttachDisk(instanceName string, diskName string, readOnly boo
|
||||
}
|
||||
|
||||
// Implements Volumes.DetachDisk
|
||||
func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, error) {
|
||||
disk, err := newAWSDisk(aws, diskName)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
awsInstance, err := aws.getAwsInstance(instanceName)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if !alreadyAttached {
|
||||
glog.Warning("DetachDisk called on non-attached disk: ", diskName)
|
||||
// TODO: Continue? Tolerate non-attached error in DetachVolume?
|
||||
}
|
||||
|
||||
request := ec2.DetachVolumeInput{
|
||||
@ -1238,12 +1265,16 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
|
||||
response, err := aws.ec2.DetachVolume(&request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error detaching EBS volume: %v", err)
|
||||
return "", fmt.Errorf("error detaching EBS volume: %v", err)
|
||||
}
|
||||
if response == nil {
|
||||
return errors.New("no response from DetachVolume")
|
||||
return "", errors.New("no response from DetachVolume")
|
||||
}
|
||||
|
||||
// TODO: Fix this - just remove the cache?
|
||||
// If we don't have a cache; we don't have to wait any more (the driver does it for us)
|
||||
// Also, maybe we could get the locally connected drivers from the AWS metadata service?
|
||||
|
||||
// At this point we are waiting for the volume being detached. This
|
||||
// releases the volume and invalidates the cache even when there is a timeout.
|
||||
//
|
||||
@ -1253,6 +1284,7 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
// works though. An option would be to completely flush the cache upon timeouts.
|
||||
//
|
||||
defer func() {
|
||||
// TODO: Not thread safe?
|
||||
for mountDevice, existingVolumeID := range awsInstance.deviceMappings {
|
||||
if existingVolumeID == disk.awsID {
|
||||
awsInstance.releaseMountDevice(disk.awsID, mountDevice)
|
||||
@ -1263,14 +1295,15 @@ func (aws *AWSCloud) DetachDisk(instanceName string, diskName string) error {
|
||||
|
||||
err = disk.waitForAttachmentStatus("detached")
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
|
||||
return err
|
||||
hostDevicePath := "/dev/xvd" + string(mountDevice)
|
||||
return hostDevicePath, err
|
||||
}
|
||||
|
||||
// Implements Volumes.CreateVolume
|
||||
func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
||||
func (s *AWSCloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
|
||||
// TODO: Should we tag this with the cluster id (so it gets deleted when the cluster does?)
|
||||
|
||||
request := &ec2.CreateVolumeInput{}
|
||||
@ -1302,7 +1335,7 @@ func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
||||
tagRequest.Tags = tags
|
||||
if _, err := s.createTags(tagRequest); err != nil {
|
||||
// delete the volume and hope it succeeds
|
||||
delerr := s.DeleteVolume(volumeName)
|
||||
_, delerr := s.DeleteDisk(volumeName)
|
||||
if delerr != nil {
|
||||
// delete did not succeed, we have a stray volume!
|
||||
return "", fmt.Errorf("error tagging volume %s, could not delete the volume: %v", volumeName, delerr)
|
||||
@ -1313,11 +1346,11 @@ func (s *AWSCloud) CreateVolume(volumeOptions *VolumeOptions) (string, error) {
|
||||
return volumeName, nil
|
||||
}
|
||||
|
||||
// Implements Volumes.DeleteVolume
|
||||
func (aws *AWSCloud) DeleteVolume(volumeName string) error {
|
||||
// Implements Volumes.DeleteDisk
|
||||
func (aws *AWSCloud) DeleteDisk(volumeName string) (bool, error) {
|
||||
awsDisk, err := newAWSDisk(aws, volumeName)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
return awsDisk.deleteVolume()
|
||||
}
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
@ -100,15 +99,15 @@ func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *volume.Spec,
|
||||
|
||||
return &awsElasticBlockStoreBuilder{
|
||||
awsElasticBlockStore: &awsElasticBlockStore{
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
volumeID: volumeID,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
podUID: podUID,
|
||||
volName: spec.Name(),
|
||||
volumeID: volumeID,
|
||||
partition: partition,
|
||||
manager: manager,
|
||||
mounter: mounter,
|
||||
plugin: plugin,
|
||||
},
|
||||
fsType: fsType,
|
||||
partition: partition,
|
||||
readOnly: readOnly,
|
||||
diskMounter: &mount.SafeFormatAndMount{plugin.host.GetMounter(), exec.New()}}, nil
|
||||
}
|
||||
@ -181,6 +180,8 @@ type awsElasticBlockStore struct {
|
||||
podUID types.UID
|
||||
// Unique id of the PD, used to find the disk resource in the provider.
|
||||
volumeID string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Utility interface that provides API calls to the provider to attach/detach disks.
|
||||
manager ebsManager
|
||||
// Mounter interface that provides system calls to mount the global path to the pod local path.
|
||||
@ -196,22 +197,10 @@ func detachDiskLogError(ebs *awsElasticBlockStore) {
|
||||
}
|
||||
}
|
||||
|
||||
// getVolumeProvider returns the AWS Volumes interface
|
||||
func (ebs *awsElasticBlockStore) getVolumeProvider() (awscloud.Volumes, error) {
|
||||
cloud := ebs.plugin.host.GetCloudProvider()
|
||||
volumes, ok := cloud.(awscloud.Volumes)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Cloud provider does not support volumes")
|
||||
}
|
||||
return volumes, nil
|
||||
}
|
||||
|
||||
type awsElasticBlockStoreBuilder struct {
|
||||
*awsElasticBlockStore
|
||||
// Filesystem type, optional.
|
||||
fsType string
|
||||
// Specifies the partition to mount
|
||||
partition string
|
||||
// Specifies whether the disk will be attached as read-only.
|
||||
readOnly bool
|
||||
// diskMounter provides the interface that is used to mount the actual block device.
|
||||
@ -304,6 +293,7 @@ func makeGlobalPDPath(host volume.VolumeHost, volumeID string) string {
|
||||
return path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts", name)
|
||||
}
|
||||
|
||||
// Reverses the mapping done in makeGlobalPDPath
|
||||
func getVolumeIDFromGlobalMount(host volume.VolumeHost, globalPath string) (string, error) {
|
||||
basePath := path.Join(host.GetPluginDir(awsElasticBlockStorePluginName), "mounts")
|
||||
rel, err := filepath.Rel(basePath, globalPath)
|
||||
|
@ -68,11 +68,12 @@ func TestGetAccessModes(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
|
||||
if !contains(plug.GetAccessModes(), api.ReadWriteOnce) {
|
||||
t.Errorf("Expected to find AccessMode: %s", api.ReadWriteOnce)
|
||||
t.Errorf("Expected to support AccessModeTypes: %s", api.ReadWriteOnce)
|
||||
}
|
||||
if len(plug.GetAccessModes()) != 1 {
|
||||
t.Errorf("Expected to find exactly one AccessMode")
|
||||
if contains(plug.GetAccessModes(), api.ReadOnlyMany) {
|
||||
t.Errorf("Expected not to support AccessModeTypes: %s", api.ReadOnlyMany)
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,7 +86,10 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA
|
||||
return false
|
||||
}
|
||||
|
||||
type fakePDManager struct{}
|
||||
type fakePDManager struct {
|
||||
attachCalled bool
|
||||
detachCalled bool
|
||||
}
|
||||
|
||||
// TODO(jonesdl) To fully test this, we could create a loopback device
|
||||
// and mount that instead.
|
||||
@ -95,6 +99,10 @@ func (fake *fakePDManager) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, gl
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.attachCalled = true
|
||||
// Simulate the global mount so that the fakeMounter returns the
|
||||
// expected number of mounts for the attached disk.
|
||||
b.mounter.Mount(globalPath, globalPath, b.fsType, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -104,6 +112,7 @@ func (fake *fakePDManager) DetachDisk(c *awsElasticBlockStoreCleaner) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fake.detachCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -121,7 +130,7 @@ func (fake *fakePDManager) DeleteVolume(cd *awsElasticBlockStoreDeleter) error {
|
||||
func TestPlugin(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("awsebsTest")
|
||||
if err != nil {
|
||||
t.Fatalf("can't make a temp dir: %v")
|
||||
t.Fatalf("can't make a temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
@ -140,13 +149,16 @@ func TestPlugin(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
fakeManager := &fakePDManager{}
|
||||
fakeMounter := &mount.FakeMounter{}
|
||||
builder, err := plug.(*awsElasticBlockStorePlugin).newBuilderInternal(volume.NewSpecFromVolume(spec), types.UID("poduid"), fakeManager, fakeMounter)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Builder: %v", err)
|
||||
}
|
||||
if builder == nil {
|
||||
t.Errorf("Got a nil Builder")
|
||||
}
|
||||
|
||||
volPath := path.Join(tmpDir, "pods/poduid/volumes/kubernetes.io~aws-ebs/vol1")
|
||||
path := builder.GetPath()
|
||||
if path != volPath {
|
||||
@ -170,8 +182,12 @@ func TestPlugin(t *testing.T) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
}
|
||||
if !fakeManager.attachCalled {
|
||||
t.Errorf("Attach watch not called")
|
||||
}
|
||||
|
||||
cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), &fakePDManager{}, &mount.FakeMounter{})
|
||||
fakeManager = &fakePDManager{}
|
||||
cleaner, err := plug.(*awsElasticBlockStorePlugin).newCleanerInternal("vol1", types.UID("poduid"), fakeManager, fakeMounter)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Cleaner: %v", err)
|
||||
}
|
||||
@ -187,9 +203,12 @@ func TestPlugin(t *testing.T) {
|
||||
} else if !os.IsNotExist(err) {
|
||||
t.Errorf("SetUp() failed: %v", err)
|
||||
}
|
||||
if !fakeManager.detachCalled {
|
||||
t.Errorf("Detach watch not called")
|
||||
}
|
||||
|
||||
// Test Provisioner
|
||||
cap := resource.MustParse("100Gi")
|
||||
cap := resource.MustParse("100Mi")
|
||||
options := volume.VolumeOptions{
|
||||
Capacity: cap,
|
||||
AccessModes: []api.PersistentVolumeAccessMode{
|
||||
|
@ -17,47 +17,56 @@ limitations under the License.
|
||||
package aws_ebs
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
aws_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
"k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
const (
|
||||
diskPartitionSuffix = ""
|
||||
diskXVDPath = "/dev/xvd"
|
||||
diskXVDPattern = "/dev/xvd*"
|
||||
maxChecks = 60
|
||||
maxRetries = 10
|
||||
checkSleepDuration = time.Second
|
||||
errorSleepDuration = 5 * time.Second
|
||||
)
|
||||
|
||||
// Singleton key mutex for keeping attach/detach operations for the same PD atomic
|
||||
var attachDetachMutex = keymutex.NewKeyMutex()
|
||||
|
||||
type AWSDiskUtil struct{}
|
||||
|
||||
// Attaches a disk specified by a volume.AWSElasticBlockStore to the current kubelet.
|
||||
// Attaches a disk to the current kubelet.
|
||||
// Mounts the disk to it's global path.
|
||||
func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error {
|
||||
volumes, err := b.getVolumeProvider()
|
||||
func (diskUtil *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, globalPDPath string) error {
|
||||
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Will block for existing operations, if any. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath)
|
||||
|
||||
// Block execution until any pending detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(b.volumeID)
|
||||
defer attachDetachMutex.UnlockKey(b.volumeID)
|
||||
|
||||
glog.V(5).Infof("AttachAndMountDisk(...) called for PD %q. Awake and ready to execute. (globalPDPath=%q)\r\n", b.volumeID, globalPDPath)
|
||||
|
||||
xvdBefore, err := filepath.Glob(diskXVDPattern)
|
||||
if err != nil {
|
||||
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskXVDPattern, err)
|
||||
}
|
||||
xvdBeforeSet := sets.NewString(xvdBefore...)
|
||||
|
||||
devicePath, err := attachDiskAndVerify(b, xvdBeforeSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
devicePath, err := volumes.AttachDisk("", b.volumeID, b.readOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b.partition != "" {
|
||||
devicePath = devicePath + b.partition
|
||||
}
|
||||
//TODO(jonesdl) There should probably be better method than busy-waiting here.
|
||||
numTries := 0
|
||||
for {
|
||||
_, err := os.Stat(devicePath)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
numTries++
|
||||
if numTries == 10 {
|
||||
return errors.New("Could not attach disk: Timeout after 10s (" + devicePath + ")")
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
// Only mount the PD globally once.
|
||||
notMnt, err := b.mounter.IsLikelyNotMountPoint(globalPDPath)
|
||||
@ -87,64 +96,239 @@ func (util *AWSDiskUtil) AttachAndMountDisk(b *awsElasticBlockStoreBuilder, glob
|
||||
|
||||
// Unmounts the device and detaches the disk from the kubelet's host machine.
|
||||
func (util *AWSDiskUtil) DetachDisk(c *awsElasticBlockStoreCleaner) error {
|
||||
// Unmount the global PD mount, which should be the only one.
|
||||
globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
|
||||
if err := c.mounter.Unmount(globalPDPath); err != nil {
|
||||
glog.V(2).Info("Error unmount dir ", globalPDPath, ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(globalPDPath); err != nil {
|
||||
glog.V(2).Info("Error removing dir ", globalPDPath, ": ", err)
|
||||
return err
|
||||
}
|
||||
// Detach the disk
|
||||
volumes, err := c.getVolumeProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider for volumeID ", c.volumeID, ": ", err)
|
||||
return err
|
||||
}
|
||||
if err := volumes.DetachDisk("", c.volumeID); err != nil {
|
||||
glog.V(2).Info("Error detaching disk ", c.volumeID, ": ", err)
|
||||
return err
|
||||
glog.V(5).Infof("DetachDisk(...) for PD %q\r\n", c.volumeID)
|
||||
|
||||
if err := unmountPDAndRemoveGlobalPath(c); err != nil {
|
||||
glog.Errorf("Error unmounting PD %q: %v", c.volumeID, err)
|
||||
}
|
||||
|
||||
// Detach disk asynchronously so that the kubelet sync loop is not blocked.
|
||||
go detachDiskAndVerify(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (util *AWSDiskUtil) DeleteVolume(d *awsElasticBlockStoreDeleter) error {
|
||||
volumes, err := d.getVolumeProvider()
|
||||
cloud, err := getCloudProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := volumes.DeleteVolume(d.volumeID); err != nil {
|
||||
glog.V(2).Infof("Error deleting AWS EBS volume %s: %v", d.volumeID, err)
|
||||
deleted, err := cloud.DeleteDisk(d.volumeID)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Error deleting EBS Disk volume %s: %v", d.volumeID, err)
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Successfully deleted AWS EBS volume %s", d.volumeID)
|
||||
if deleted {
|
||||
glog.V(2).Infof("Successfully deleted EBS Disk volume %s", d.volumeID)
|
||||
} else {
|
||||
glog.V(2).Infof("Successfully deleted EBS Disk volume %s (actually already deleted)", d.volumeID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner) (volumeID string, volumeSizeGB int, err error) {
|
||||
volumes, err := c.getVolumeProvider()
|
||||
cloud, err := getCloudProvider()
|
||||
if err != nil {
|
||||
glog.V(2).Info("Error getting volume provider: ", err)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
requestBytes := c.options.Capacity.Value()
|
||||
// AWS works with gigabytes, convert to GiB with rounding up
|
||||
requestGB := int(volume.RoundUpSize(requestBytes, 1024*1024*1024))
|
||||
volSpec := &aws_cloud.VolumeOptions{
|
||||
CapacityGB: requestGB,
|
||||
Tags: c.options.CloudTags,
|
||||
}
|
||||
// The cloud provider works with gigabytes, convert to GiB with rounding up
|
||||
requestGB := volume.RoundUpSize(requestBytes, 1024*1024*1024)
|
||||
|
||||
name, err := volumes.CreateVolume(volSpec)
|
||||
volumeOptions := &aws.VolumeOptions{}
|
||||
volumeOptions.CapacityGB = int(requestGB)
|
||||
|
||||
name, err := cloud.CreateDisk(volumeOptions)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Error creating AWS EBS volume: %v", err)
|
||||
glog.V(2).Infof("Error creating EBS Disk volume: %v", err)
|
||||
return "", 0, err
|
||||
}
|
||||
glog.V(2).Infof("Successfully created AWS EBS volume %s", name)
|
||||
return name, requestGB, nil
|
||||
glog.V(2).Infof("Successfully created EBS Disk volume %s", name)
|
||||
return name, int(requestGB), nil
|
||||
}
|
||||
|
||||
// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails.
|
||||
func attachDiskAndVerify(b *awsElasticBlockStoreBuilder, xvdBeforeSet sets.String) (string, error) {
|
||||
var awsCloud *aws.AWSCloud
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
var err error
|
||||
if awsCloud == nil {
|
||||
awsCloud, err = getCloudProvider()
|
||||
if err != nil || awsCloud == nil {
|
||||
// Retry on error. See issue #11321
|
||||
glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", b.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying attach for EBS Disk %q (retry count=%v).", b.volumeID, numRetries)
|
||||
}
|
||||
|
||||
devicePath, err := awsCloud.AttachDisk(b.volumeID, b.plugin.host.GetHostName(), b.readOnly)
|
||||
if err != nil {
|
||||
glog.Errorf("Error attaching PD %q: %v", b.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
devicePaths := getDiskByIdPaths(b.awsElasticBlockStore, devicePath)
|
||||
|
||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||
path, err := verifyDevicePath(devicePaths)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically. See issue #11321
|
||||
glog.Errorf("Error verifying EBS Disk (%q) is attached: %v", b.volumeID, err)
|
||||
} else if path != "" {
|
||||
// A device path has successfully been created for the PD
|
||||
glog.Infof("Successfully attached EBS Disk %q.", b.volumeID)
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// Sleep then check again
|
||||
glog.V(3).Infof("Waiting for EBS Disk %q to attach.", b.volumeID)
|
||||
time.Sleep(checkSleepDuration)
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("Could not attach EBS Disk %q. Timeout waiting for mount paths to be created.", b.volumeID)
|
||||
}
|
||||
|
||||
// Returns the first path that exists, or empty string if none exist.
|
||||
func verifyDevicePath(devicePaths []string) (string, error) {
|
||||
for _, path := range devicePaths {
|
||||
if pathExists, err := pathExists(path); err != nil {
|
||||
return "", fmt.Errorf("Error checking if path exists: %v", err)
|
||||
} else if pathExists {
|
||||
return path, nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
|
||||
// This function is intended to be called asynchronously as a go routine.
|
||||
func detachDiskAndVerify(c *awsElasticBlockStoreCleaner) {
|
||||
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Will block for pending operations", c.volumeID)
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
// Block execution until any pending attach/detach operations for this PD have completed
|
||||
attachDetachMutex.LockKey(c.volumeID)
|
||||
defer attachDetachMutex.UnlockKey(c.volumeID)
|
||||
|
||||
glog.V(5).Infof("detachDiskAndVerify(...) for pd %q. Awake and ready to execute.", c.volumeID)
|
||||
|
||||
var awsCloud *aws.AWSCloud
|
||||
for numRetries := 0; numRetries < maxRetries; numRetries++ {
|
||||
var err error
|
||||
if awsCloud == nil {
|
||||
awsCloud, err = getCloudProvider()
|
||||
if err != nil || awsCloud == nil {
|
||||
// Retry on error. See issue #11321
|
||||
glog.Errorf("Error getting AWSCloudProvider while detaching PD %q: %v", c.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if numRetries > 0 {
|
||||
glog.Warningf("Retrying detach for EBS Disk %q (retry count=%v).", c.volumeID, numRetries)
|
||||
}
|
||||
|
||||
devicePath, err := awsCloud.DetachDisk(c.volumeID, c.plugin.host.GetHostName())
|
||||
if err != nil {
|
||||
glog.Errorf("Error detaching PD %q: %v", c.volumeID, err)
|
||||
time.Sleep(errorSleepDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
devicePaths := getDiskByIdPaths(c.awsElasticBlockStore, devicePath)
|
||||
|
||||
for numChecks := 0; numChecks < maxChecks; numChecks++ {
|
||||
allPathsRemoved, err := verifyAllPathsRemoved(devicePaths)
|
||||
if err != nil {
|
||||
// Log error, if any, and continue checking periodically.
|
||||
glog.Errorf("Error verifying EBS Disk (%q) is detached: %v", c.volumeID, err)
|
||||
} else if allPathsRemoved {
|
||||
// All paths to the PD have been successfully removed
|
||||
unmountPDAndRemoveGlobalPath(c)
|
||||
glog.Infof("Successfully detached EBS Disk %q.", c.volumeID)
|
||||
return
|
||||
}
|
||||
|
||||
// Sleep then check again
|
||||
glog.V(3).Infof("Waiting for EBS Disk %q to detach.", c.volumeID)
|
||||
time.Sleep(checkSleepDuration)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
glog.Errorf("Failed to detach EBS Disk %q. One or more mount paths was not removed.", c.volumeID)
|
||||
}
|
||||
|
||||
// Unmount the global PD mount, which should be the only one, and delete it.
|
||||
func unmountPDAndRemoveGlobalPath(c *awsElasticBlockStoreCleaner) error {
|
||||
globalPDPath := makeGlobalPDPath(c.plugin.host, c.volumeID)
|
||||
|
||||
err := c.mounter.Unmount(globalPDPath)
|
||||
os.Remove(globalPDPath)
|
||||
return err
|
||||
}
|
||||
|
||||
// Returns the first path that exists, or empty string if none exist.
|
||||
func verifyAllPathsRemoved(devicePaths []string) (bool, error) {
|
||||
allPathsRemoved := true
|
||||
for _, path := range devicePaths {
|
||||
if exists, err := pathExists(path); err != nil {
|
||||
return false, fmt.Errorf("Error checking if path exists: %v", err)
|
||||
} else {
|
||||
allPathsRemoved = allPathsRemoved && !exists
|
||||
}
|
||||
}
|
||||
|
||||
return allPathsRemoved, nil
|
||||
}
|
||||
|
||||
// Returns list of all paths for given EBS mount
|
||||
// This is more interesting on GCE (where we are able to identify volumes under /dev/disk-by-id)
|
||||
// Here it is mostly about applying the partition path
|
||||
func getDiskByIdPaths(d *awsElasticBlockStore, devicePath string) []string {
|
||||
devicePaths := []string{}
|
||||
if devicePath != "" {
|
||||
devicePaths = append(devicePaths, devicePath)
|
||||
}
|
||||
|
||||
if d.partition != "" {
|
||||
for i, path := range devicePaths {
|
||||
devicePaths[i] = path + diskPartitionSuffix + d.partition
|
||||
}
|
||||
}
|
||||
|
||||
return devicePaths
|
||||
}
|
||||
|
||||
// Checks if the specified path exists
|
||||
func pathExists(path string) (bool, error) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
} else if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Return cloud provider
|
||||
func getCloudProvider() (*aws.AWSCloud, error) {
|
||||
awsCloudProvider, err := cloudprovider.GetCloudProvider("aws", nil)
|
||||
if err != nil || awsCloudProvider == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The conversion must be safe otherwise bug in GetCloudProvider()
|
||||
return awsCloudProvider.(*aws.AWSCloud), nil
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ func detachDiskAndVerify(c *gcePersistentDiskCleaner) {
|
||||
// Log error, if any, and continue checking periodically.
|
||||
glog.Errorf("Error verifying GCE PD (%q) is detached: %v", c.pdName, err)
|
||||
} else if allPathsRemoved {
|
||||
// All paths to the PD have been succefully removed
|
||||
// All paths to the PD have been successfully removed
|
||||
unmountPDAndRemoveGlobalPath(c)
|
||||
glog.Infof("Successfully detached GCE PD %q.", c.pdName)
|
||||
return
|
||||
|
@ -33,20 +33,20 @@ type mockVolumes struct {
|
||||
|
||||
var _ aws.Volumes = &mockVolumes{}
|
||||
|
||||
func (v *mockVolumes) AttachDisk(instanceName string, volumeName string, readOnly bool) (string, error) {
|
||||
func (v *mockVolumes) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) {
|
||||
return "", fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) DetachDisk(instanceName string, volumeName string) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) CreateVolume(volumeOptions *aws.VolumeOptions) (volumeName string, err error) {
|
||||
func (v *mockVolumes) DetachDisk(diskName string, instanceName string) (string, error) {
|
||||
return "", fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) DeleteVolume(volumeName string) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName string, err error) {
|
||||
return "", fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) DeleteDisk(volumeName string) (bool, error) {
|
||||
return false, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (v *mockVolumes) GetVolumeLabels(volumeName string) (map[string]string, error) {
|
||||
|
@ -327,7 +327,7 @@ func createPD() (string, error) {
|
||||
}
|
||||
volumeOptions := &awscloud.VolumeOptions{}
|
||||
volumeOptions.CapacityGB = 10
|
||||
return volumes.CreateVolume(volumeOptions)
|
||||
return volumes.CreateDisk(volumeOptions)
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,7 +354,15 @@ func deletePD(pdName string) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
return volumes.DeleteVolume(pdName)
|
||||
deleted, err := volumes.DeleteDisk(pdName)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
if !deleted {
|
||||
Logf("Volume deletion implicitly succeeded because volume %q does not exist.", pdName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -384,7 +392,8 @@ func detachPD(hostName, pdName string) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("Provider does not support volumes")
|
||||
}
|
||||
return volumes.DetachDisk(hostName, pdName)
|
||||
_, err := volumes.DetachDisk(pdName, hostName)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user