diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 5ba3091486a..cba7eed90cb 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -932,9 +932,10 @@ type awsInstance struct { mutex sync.Mutex - // We must cache because otherwise there is a race condition, - // where we assign a device mapping and then get a second request before we attach the volume - deviceMappings map[mountDevice]string + // We keep an active list of devices we have assigned but not yet + // attached, to avoid a race condition where we assign a device mapping + // and then get a second request before we attach the volume + attaching map[mountDevice]string } // newAWSInstance creates a new awsInstance object @@ -953,8 +954,7 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance { subnetID: aws.StringValue(instance.SubnetId), } - // We lazy-init deviceMappings - self.deviceMappings = nil + self.attaching = make(map[mountDevice]string) return self } @@ -1001,31 +1001,31 @@ func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned self.mutex.Lock() defer self.mutex.Unlock() - // We cache both for efficiency and correctness - if self.deviceMappings == nil { - info, err := self.describeInstance() - if err != nil { - return "", false, err + info, err := self.describeInstance() + if err != nil { + return "", false, err + } + deviceMappings := map[mountDevice]string{} + for _, blockDevice := range info.BlockDeviceMappings { + name := aws.StringValue(blockDevice.DeviceName) + if strings.HasPrefix(name, "/dev/sd") { + name = name[7:] } - deviceMappings := map[mountDevice]string{} - for _, blockDevice := range info.BlockDeviceMappings { - name := aws.StringValue(blockDevice.DeviceName) - if strings.HasPrefix(name, "/dev/sd") { - name = name[7:] - } - if strings.HasPrefix(name, "/dev/xvd") { - name = name[8:] - } - if len(name) != 1 { - glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName)) - } - deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId) + if strings.HasPrefix(name, "/dev/xvd") { + name = name[8:] } - self.deviceMappings = deviceMappings + if len(name) != 1 { + glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName)) + } + deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId) + } + + for mountDevice, volume := range self.attaching { + deviceMappings[mountDevice] = volume } // Check to see if this volume is already assigned a device on this machine - for mountDevice, mappingVolumeID := range self.deviceMappings { + for mountDevice, mappingVolumeID := range deviceMappings { if volumeID == mappingVolumeID { if assign { glog.Warningf("Got assignment call for already-assigned volume: %s@%s", mountDevice, mappingVolumeID) @@ -1042,7 +1042,7 @@ func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned valid := instanceType.getEBSMountDevices() chosen := mountDevice("") for _, mountDevice := range valid { - _, found := self.deviceMappings[mountDevice] + _, found := deviceMappings[mountDevice] if !found { chosen = mountDevice break @@ -1050,31 +1050,31 @@ func (self *awsInstance) getMountDevice(volumeID string, assign bool) (assigned } if chosen == "" { - glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", self.deviceMappings, valid) + glog.Warningf("Could not assign a mount device (all in use?). mappings=%v, valid=%v", deviceMappings, valid) return "", false, nil } - self.deviceMappings[chosen] = volumeID + self.attaching[chosen] = volumeID glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID) return chosen, false, nil } -func (self *awsInstance) releaseMountDevice(volumeID string, mountDevice mountDevice) { +func (self *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) { self.mutex.Lock() defer self.mutex.Unlock() - existingVolumeID, found := self.deviceMappings[mountDevice] + existingVolumeID, found := self.attaching[mountDevice] if !found { - glog.Errorf("releaseMountDevice on non-allocated device") + glog.Errorf("endAttaching on non-allocated device") return } if volumeID != existingVolumeID { - glog.Errorf("releaseMountDevice on device assigned to different volume") + glog.Errorf("endAttaching on device assigned to different volume") return } glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID) - delete(self.deviceMappings, mountDevice) + delete(self.attaching, mountDevice) } type awsDisk struct { @@ -1144,6 +1144,8 @@ func (self *awsDisk) describeVolume() (*ec2.Volume, error) { return volumes[0], nil } +// waitForAttachmentStatus polls until the attachment status is the expected value +// TODO(justinsb): return (bool, error) func (self *awsDisk) waitForAttachmentStatus(status string) error { // TODO: There may be a faster way to get this when we're attaching locally attempt := 0 @@ -1278,10 +1280,12 @@ func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly boo ec2Device = "/dev/sd" + string(mountDevice) } - attached := false + // attachEnded is set to true if the attach operation completed + // (successfully or not) + attachEnded := false defer func() { - if !attached { - awsInstance.releaseMountDevice(disk.awsID, mountDevice) + if attachEnded { + awsInstance.endAttaching(disk.awsID, mountDevice) } }() @@ -1294,6 +1298,7 @@ func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly boo attachResponse, err := c.ec2.AttachVolume(request) if err != nil { + attachEnded = true // TODO: Check if the volume was concurrently attached? return "", fmt.Errorf("Error attaching EBS volume: %v", err) } @@ -1306,7 +1311,7 @@ func (c *AWSCloud) AttachDisk(diskName string, instanceName string, readOnly boo return "", err } - attached = true + attachEnded = true return hostDevice, nil } @@ -1346,33 +1351,15 @@ func (aws *AWSCloud) DetachDisk(diskName string, instanceName string) (string, e return "", errors.New("no response from DetachVolume") } - // TODO: Fix this - just remove the cache? - // If we don't have a cache; we don't have to wait any more (the driver does it for us) - // Also, maybe we could get the locally connected drivers from the AWS metadata service? - - // At this point we are waiting for the volume being detached. This - // releases the volume and invalidates the cache even when there is a timeout. - // - // TODO: A timeout leaves the cache in an inconsistent state. The volume is still - // detaching though the cache shows it as ready to be attached again. Subsequent - // attach operations will fail. The attach is being retried and eventually - // works though. An option would be to completely flush the cache upon timeouts. - // - defer func() { - // TODO: Not thread safe? - for mountDevice, existingVolumeID := range awsInstance.deviceMappings { - if existingVolumeID == disk.awsID { - awsInstance.releaseMountDevice(disk.awsID, mountDevice) - return - } - } - }() - err = disk.waitForAttachmentStatus("detached") if err != nil { return "", err } + if mountDevice != "" { + awsInstance.endAttaching(disk.awsID, mountDevice) + } + hostDevicePath := "/dev/xvd" + string(mountDevice) return hostDevicePath, err }