diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 4bcbbd2816d..afc206024d7 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -344,6 +344,12 @@ type Cloud struct { mutex sync.Mutex lastNodeNames sets.String lastInstancesByNodeNames []*ec2.Instance + + // We keep an active list of devices we have assigned but not yet + // attached, to avoid a race condition where we assign a device mapping + // and then get a second request before we attach the volume + attachingMutex sync.Mutex + attaching map[ /*nodeName*/ string]map[mountDevice]string } var _ Volumes = &Cloud{} @@ -721,6 +727,10 @@ func azToRegion(az string) (string, error) { // newAWSCloud creates a new instance of AWSCloud. // AWSProvider and instanceId are primarily for tests func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { + // We have some state in the Cloud object - in particular the attaching map + // Log so that if we are building multiple Cloud objects, it is obvious! + glog.Infof("Building AWS cloudprovider") + metadata, err := awsServices.Metadata() if err != nil { return nil, fmt.Errorf("error creating AWS metadata client: %v", err) @@ -771,6 +781,8 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { metadata: metadata, cfg: cfg, region: regionName, + + attaching: make(map[string]map[mountDevice]string), } selfAWSInstance, err := awsCloud.buildSelfAWSInstance() @@ -1090,13 +1102,6 @@ type awsInstance struct { // instance type instanceType string - - mutex sync.Mutex - - // We keep an active list of devices we have assigned but not yet - // attached, to avoid a race condition where we assign a device mapping - // and then get a second request before we attach the volume - attaching map[mountDevice]string } // newAWSInstance creates a new awsInstance object @@ -1115,8 +1120,6 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance { subnetID: aws.StringValue(instance.SubnetId), } - self.attaching = make(map[mountDevice]string) - return self } @@ -1150,18 +1153,12 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) { // Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice. // If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true. // Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false. -func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) { +func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) { instanceType := i.getInstanceType() if instanceType == nil { return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID) } - // We lock to prevent concurrent mounts from conflicting - // We may still conflict if someone calls the API concurrently, - // but the AWS API will then fail one of the two attach operations - i.mutex.Lock() - defer i.mutex.Unlock() - info, err := i.describeInstance() if err != nil { return "", false, err @@ -1181,7 +1178,13 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId) } - for mountDevice, volume := range i.attaching { + // We lock to prevent concurrent mounts from conflicting + // We may still conflict if someone calls the API concurrently, + // but the AWS API will then fail one of the two attach operations + c.attachingMutex.Lock() + defer c.attachingMutex.Unlock() + + for mountDevice, volume := range c.attaching[i.nodeName] { deviceMappings[mountDevice] = volume } @@ -1216,27 +1219,34 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName) } - i.attaching[chosen] = volumeID + attaching := c.attaching[i.nodeName] + if attaching == nil { + attaching = make(map[mountDevice]string) + c.attaching[i.nodeName] = attaching + } + attaching[chosen] = volumeID glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID) return chosen, false, nil } -func (i *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) { - i.mutex.Lock() - defer i.mutex.Unlock() +// endAttaching removes the entry from the "attachments in progress" map +// It returns true if it was found (and removed), false otherwise +func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool { + c.attachingMutex.Lock() + defer c.attachingMutex.Unlock() - existingVolumeID, found := i.attaching[mountDevice] + existingVolumeID, found := c.attaching[i.nodeName][mountDevice] if !found { - glog.Errorf("endAttaching on non-allocated device") - return + return false } if volumeID != existingVolumeID { glog.Errorf("endAttaching on device assigned to different volume") - return + return false } - glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID) - delete(i.attaching, mountDevice) + glog.V(2).Infof("Releasing in-process attachment entry: %s -> volume %s", mountDevice, volumeID) + delete(c.attaching[i.nodeName], mountDevice) + return true } type awsDisk struct { @@ -1307,47 +1317,50 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) { } // waitForAttachmentStatus polls until the attachment status is the expected value -// TODO(justinsb): return (bool, error) -func (d *awsDisk) waitForAttachmentStatus(status string) error { - // TODO: There may be a faster way to get this when we're attaching locally +// On success, it returns the last attachment state. +func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment, error) { attempt := 0 maxAttempts := 60 for { info, err := d.describeVolume() if err != nil { - return err + return nil, err } if len(info.Attachments) > 1 { + // Shouldn't happen; log so we know if it is glog.Warningf("Found multiple attachments for volume: %v", info) } + var attachment *ec2.VolumeAttachment attachmentStatus := "" - for _, attachment := range info.Attachments { + for _, a := range info.Attachments { if attachmentStatus != "" { - glog.Warning("Found multiple attachments: ", info) + // Shouldn't happen; log so we know if it is + glog.Warningf("Found multiple attachments: %v", info) } - if attachment.State != nil { - attachmentStatus = *attachment.State + if a.State != nil { + attachment = a + attachmentStatus = *a.State } else { - // Shouldn't happen, but don't panic... - glog.Warning("Ignoring nil attachment state: ", attachment) + // Shouldn't happen; log so we know if it is + glog.Warningf("Ignoring nil attachment state: %v", a) } } if attachmentStatus == "" { attachmentStatus = "detached" } if attachmentStatus == status { - return nil + return attachment, nil } - glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status) - attempt++ if attempt > maxAttempts { glog.Warningf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status) - return errors.New("Timeout waiting for volume state") + return nil, fmt.Errorf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status) } + glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status) + time.Sleep(1 * time.Second) } } @@ -1428,7 +1441,23 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) return "", errors.New("AWS volumes cannot be mounted read-only") } - mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true) + // mountDevice will hold the device where we should try to attach the disk + var mountDevice mountDevice + // alreadyAttached is true if we have already called AttachVolume on this disk + var alreadyAttached bool + + // attachEnded is set to true if the attach operation completed + // (successfully or not), and is thus no longer in progress + attachEnded := false + defer func() { + if attachEnded { + if !c.endAttaching(awsInstance, disk.awsID, mountDevice) { + glog.Errorf("endAttaching called when attach not in progress") + } + } + }() + + mountDevice, alreadyAttached, err = c.getMountDevice(awsInstance, disk.awsID, true) if err != nil { return "", err } @@ -1439,15 +1468,6 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) // See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html ec2Device := "/dev/xvd" + string(mountDevice) - // attachEnded is set to true if the attach operation completed - // (successfully or not) - attachEnded := false - defer func() { - if attachEnded { - awsInstance.endAttaching(disk.awsID, mountDevice) - } - }() - if !alreadyAttached { request := &ec2.AttachVolumeInput{ Device: aws.String(ec2Device), @@ -1465,13 +1485,28 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) glog.V(2).Infof("AttachVolume request returned %v", attachResponse) } - err = disk.waitForAttachmentStatus("attached") + attachment, err := disk.waitForAttachmentStatus("attached") if err != nil { return "", err } + // The attach operation has finished attachEnded = true + // Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint + // It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call, + // which could theoretically be against a different device (or even instance). + if attachment == nil { + // Impossible? + return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, instanceName) + } + if ec2Device != aws.StringValue(attachment.Device) { + return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, instanceName, ec2Device, aws.StringValue(attachment.Device)) + } + if awsInstance.awsID != aws.StringValue(attachment.InstanceId) { + return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", diskName, instanceName, awsInstance.awsID, aws.StringValue(attachment.InstanceId)) + } + return hostDevice, nil } @@ -1496,14 +1531,14 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) return "", err } - mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false) + mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, false) if err != nil { return "", err } if !alreadyAttached { glog.Warningf("DetachDisk called on non-attached disk: %s", diskName) - // TODO: Continue? Tolerate non-attached error in DetachVolume? + // TODO: Continue? Tolerate non-attached error from the AWS DetachVolume call? } request := ec2.DetachVolumeInput{ @@ -1519,13 +1554,19 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) return "", errors.New("no response from DetachVolume") } - err = disk.waitForAttachmentStatus("detached") + attachment, err := disk.waitForAttachmentStatus("detached") if err != nil { return "", err } + if attachment != nil { + // We expect it to be nil, it is (maybe) interesting if it is not + glog.V(2).Infof("waitForAttachmentStatus returned non-nil attachment with state=detached: %v", attachment) + } if mountDevice != "" { - awsInstance.endAttaching(disk.awsID, mountDevice) + c.endAttaching(awsInstance, disk.awsID, mountDevice) + // We don't check the return value - we don't really expect the attachment to have been + // in progress, though it might have been } hostDevicePath := "/dev/xvd" + string(mountDevice)