mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #22791 from justinsb/fix_15073
Auto commit by PR queue bot
This commit is contained in:
commit
949b2e612a
@ -932,9 +932,10 @@ type awsInstance struct {
|
|||||||
|
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
|
||||||
// We must cache because otherwise there is a race condition,
|
// We keep an active list of devices we have assigned but not yet
|
||||||
// where we assign a device mapping and then get a second request before we attach the volume
|
// attached, to avoid a race condition where we assign a device mapping
|
||||||
deviceMappings map[mountDevice]string
|
// and then get a second request before we attach the volume
|
||||||
|
attaching map[mountDevice]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// newAWSInstance creates a new awsInstance object
|
// newAWSInstance creates a new awsInstance object
|
||||||
@ -953,8 +954,7 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance {
|
|||||||
subnetID: aws.StringValue(instance.SubnetId),
|
subnetID: aws.StringValue(instance.SubnetId),
|
||||||
}
|
}
|
||||||
|
|
||||||
// We lazy-init deviceMappings
|
self.attaching = make(map[mountDevice]string)
|
||||||
self.deviceMappings = nil
|
|
||||||
|
|
||||||
return self
|
return self
|
||||||
}
|
}
|
||||||
@ -1001,31 +1001,31 @@ func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned
|
|||||||
self.mutex.Lock()
|
self.mutex.Lock()
|
||||||
defer self.mutex.Unlock()
|
defer self.mutex.Unlock()
|
||||||
|
|
||||||
// We cache both for efficiency and correctness
|
info, err := self.describeInstance()
|
||||||
if self.deviceMappings == nil {
|
if err != nil {
|
||||||
info, err := self.describeInstance()
|
return "", false, err
|
||||||
if err != nil {
|
}
|
||||||
return "", false, err
|
deviceMappings := map[mountDevice]string{}
|
||||||
|
for _, blockDevice := range info.BlockDeviceMappings {
|
||||||
|
name := aws.StringValue(blockDevice.DeviceName)
|
||||||
|
if strings.HasPrefix(name, "/dev/sd") {
|
||||||
|
name = name[7:]
|
||||||
}
|
}
|
||||||
deviceMappings := map[mountDevice]string{}
|
if strings.HasPrefix(name, "/dev/xvd") {
|
||||||
for _, blockDevice := range info.BlockDeviceMappings {
|
name = name[8:]
|
||||||
name := aws.StringValue(blockDevice.DeviceName)
|
|
||||||
if strings.HasPrefix(name, "/dev/sd") {
|
|
||||||
name = name[7:]
|
|
||||||
}
|
|
||||||
if strings.HasPrefix(name, "/dev/xvd") {
|
|
||||||
name = name[8:]
|
|
||||||
}
|
|
||||||
if len(name) != 1 {
|
|
||||||
glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
|
|
||||||
}
|
|
||||||
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
|
|
||||||
}
|
}
|
||||||
self.deviceMappings = deviceMappings
|
if len(name) != 1 {
|
||||||
|
glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
|
||||||
|
}
|
||||||
|
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
|
||||||
|
}
|
||||||
|
|
||||||
|
for mountDevice, volume := range self.attaching {
|
||||||
|
deviceMappings[mountDevice] = volume
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check to see if this volume is already assigned a device on this machine
|
// Check to see if this volume is already assigned a device on this machine
|
||||||
for mountDevice, mappingVolumeID := range self.deviceMappings {
|
for mountDevice, mappingVolumeID := range deviceMappings {
|
||||||
if volumeID == mappingVolumeID {
|
if volumeID == mappingVolumeID {
|
||||||
if assign {
|
if assign {
|
||||||
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID)
|
glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID)
|
||||||
@ -1042,7 +1042,7 @@ func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned
|
|||||||
valid := instanceType.getEBSMountDevices()
|
valid := instanceType.getEBSMountDevices()
|
||||||
chosen := mountDevice("")
|
chosen := mountDevice("")
|
||||||
for _, mountDevice := range valid {
|
for _, mountDevice := range valid {
|
||||||
_, found := self.deviceMappings[mountDevice]
|
_, found := deviceMappings[mountDevice]
|
||||||
if !found {
|
if !found {
|
||||||
chosen = mountDevice
|
chosen = mountDevice
|
||||||
break
|
break
|
||||||
@ -1050,31 +1050,31 @@ func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned
|
|||||||
}
|
}
|
||||||
|
|
||||||
if chosen == "" {
|
if chosen == "" {
|
||||||
glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", self.deviceMappings, valid)
|
glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", deviceMappings, valid)
|
||||||
return "", false, nil
|
return "", false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
self.deviceMappings[chosen] = volumeID
|
self.attaching[chosen] = volumeID
|
||||||
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)
|
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)
|
||||||
|
|
||||||
return chosen, false, nil
|
return chosen, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *awsInstance) releaseMountDevice(volumeID string, mountDevice mountDevice) {
|
func (self *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) {
|
||||||
self.mutex.Lock()
|
self.mutex.Lock()
|
||||||
defer self.mutex.Unlock()
|
defer self.mutex.Unlock()
|
||||||
|
|
||||||
existingVolumeID, found := self.deviceMappings[mountDevice]
|
existingVolumeID, found := self.attaching[mountDevice]
|
||||||
if !found {
|
if !found {
|
||||||
glog.Errorf("releaseMountDevice on non-allocated device")
|
glog.Errorf("endAttaching on non-allocated device")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if volumeID != existingVolumeID {
|
if volumeID != existingVolumeID {
|
||||||
glog.Errorf("releaseMountDevice on device assigned to different volume")
|
glog.Errorf("endAttaching on device assigned to different volume")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID)
|
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID)
|
||||||
delete(self.deviceMappings, mountDevice)
|
delete(self.attaching, mountDevice)
|
||||||
}
|
}
|
||||||
|
|
||||||
type awsDisk struct {
|
type awsDisk struct {
|
||||||
@ -1144,6 +1144,8 @@ func (self *awsDisk) describeVolume() (*ec2.Volume, error) {
|
|||||||
return volumes[0], nil
|
return volumes[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForAttachmentStatus polls until the attachment status is the expected value
|
||||||
|
// TODO(justinsb): return (bool, error)
|
||||||
func (self *awsDisk) waitForAttachmentStatus(status string) error {
|
func (self *awsDisk) waitForAttachmentStatus(status string) error {
|
||||||
// TODO: There may be a faster way to get this when we're attaching locally
|
// TODO: There may be a faster way to get this when we're attaching locally
|
||||||
attempt := 0
|
attempt := 0
|
||||||
@ -1278,10 +1280,12 @@ func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly boo
|
|||||||
ec2Device = "/dev/sd" + string(mountDevice)
|
ec2Device = "/dev/sd" + string(mountDevice)
|
||||||
}
|
}
|
||||||
|
|
||||||
attached := false
|
// attachEnded is set to true if the attach operation completed
|
||||||
|
// (successfully or not)
|
||||||
|
attachEnded := false
|
||||||
defer func() {
|
defer func() {
|
||||||
if !attached {
|
if attachEnded {
|
||||||
awsInstance.releaseMountDevice(disk.awsID, mountDevice)
|
awsInstance.endAttaching(disk.awsID, mountDevice)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -1294,6 +1298,7 @@ func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly boo
|
|||||||
|
|
||||||
attachResponse, err := c.ec2.AttachVolume(request)
|
attachResponse, err := c.ec2.AttachVolume(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
attachEnded = true
|
||||||
// TODO: Check if the volume was concurrently attached?
|
// TODO: Check if the volume was concurrently attached?
|
||||||
return "", fmt.Errorf("Error attaching EBS volume: %v", err)
|
return "", fmt.Errorf("Error attaching EBS volume: %v", err)
|
||||||
}
|
}
|
||||||
@ -1306,7 +1311,7 @@ func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly boo
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
attached = true
|
attachEnded = true
|
||||||
|
|
||||||
return hostDevice, nil
|
return hostDevice, nil
|
||||||
}
|
}
|
||||||
@ -1346,33 +1351,15 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e
|
|||||||
return "", errors.New("no response from DetachVolume")
|
return "", errors.New("no response from DetachVolume")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Fix this - just remove the cache?
|
|
||||||
// If we don't have a cache; we don't have to wait any more (the driver does it for us)
|
|
||||||
// Also, maybe we could get the locally connected drivers from the AWS metadata service?
|
|
||||||
|
|
||||||
// At this point we are waiting for the volume being detached. This
|
|
||||||
// releases the volume and invalidates the cache even when there is a timeout.
|
|
||||||
//
|
|
||||||
// TODO: A timeout leaves the cache in an inconsistent state. The volume is still
|
|
||||||
// detaching though the cache shows it as ready to be attached again. Subsequent
|
|
||||||
// attach operations will fail. The attach is being retried and eventually
|
|
||||||
// works though. An option would be to completely flush the cache upon timeouts.
|
|
||||||
//
|
|
||||||
defer func() {
|
|
||||||
// TODO: Not thread safe?
|
|
||||||
for mountDevice, existingVolumeID := range awsInstance.deviceMappings {
|
|
||||||
if existingVolumeID == disk.awsID {
|
|
||||||
awsInstance.releaseMountDevice(disk.awsID, mountDevice)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
err = disk.waitForAttachmentStatus("detached")
|
err = disk.waitForAttachmentStatus("detached")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if mountDevice != "" {
|
||||||
|
awsInstance.endAttaching(disk.awsID, mountDevice)
|
||||||
|
}
|
||||||
|
|
||||||
hostDevicePath := "/dev/xvd" + string(mountDevice)
|
hostDevicePath := "/dev/xvd" + string(mountDevice)
|
||||||
return hostDevicePath, err
|
return hostDevicePath, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user