From 81240da858284533d68507811f1e06c035029d43 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sun, 21 Aug 2016 23:20:12 -0400 Subject: [PATCH] AWS: move volume attachment map to cloud level The problem is that attachments are now done on the master, and we are only caching the attachment map persistently for the local instance. So there is now a race, because the attachment map is cleared every time. Issue #29324 --- pkg/cloudprovider/providers/aws/aws.go | 81 +++++++++++++++----------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index b736f52c225..62611f8ba43 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -298,6 +298,12 @@ type Cloud struct { mutex sync.Mutex lastNodeNames sets.String lastInstancesByNodeNames []*ec2.Instance + + // We keep an active list of devices we have assigned but not yet + // attached, to avoid a race condition where we assign a device mapping + // and then get a second request before we attach the volume + attachingMutex sync.Mutex + attaching map[ /*nodeName*/ string]map[mountDevice]string } var _ Volumes = &Cloud{} @@ -675,6 +681,10 @@ func azToRegion(az string) (string, error) { // newAWSCloud creates a new instance of AWSCloud. // AWSProvider and instanceId are primarily for tests func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { + // We have some state in the Cloud object - in particular the attaching map + // Log so that if we are building multiple Cloud objects, it is obvious! + glog.Infof("Building AWS cloudprovider") + metadata, err := awsServices.Metadata() if err != nil { return nil, fmt.Errorf("error creating AWS metadata client: %v", err) @@ -725,6 +735,8 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { metadata: metadata, cfg: cfg, region: regionName, + + attaching: make(map[string]map[mountDevice]string), } selfAWSInstance, err := awsCloud.buildSelfAWSInstance() @@ -1044,13 +1056,6 @@ type awsInstance struct { // instance type instanceType string - - mutex sync.Mutex - - // We keep an active list of devices we have assigned but not yet - // attached, to avoid a race condition where we assign a device mapping - // and then get a second request before we attach the volume - attaching map[mountDevice]string } // newAWSInstance creates a new awsInstance object @@ -1069,8 +1074,6 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance { subnetID: aws.StringValue(instance.SubnetId), } - self.attaching = make(map[mountDevice]string) - return self } @@ -1104,18 +1107,12 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) { // Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice. // If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true. // Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false. -func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) { +func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) { instanceType := i.getInstanceType() if instanceType == nil { return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID) } - // We lock to prevent concurrent mounts from conflicting - // We may still conflict if someone calls the API concurrently, - // but the AWS API will then fail one of the two attach operations - i.mutex.Lock() - defer i.mutex.Unlock() - info, err := i.describeInstance() if err != nil { return "", false, err @@ -1135,7 +1132,13 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId) } - for mountDevice, volume := range i.attaching { + // We lock to prevent concurrent mounts from conflicting + // We may still conflict if someone calls the API concurrently, + // but the AWS API will then fail one of the two attach operations + c.attachingMutex.Lock() + defer c.attachingMutex.Unlock() + + for mountDevice, volume := range c.attaching[i.nodeName] { deviceMappings[mountDevice] = volume } @@ -1170,27 +1173,34 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName) } - i.attaching[chosen] = volumeID + attaching := c.attaching[i.nodeName] + if attaching == nil { + attaching = make(map[mountDevice]string) + c.attaching[i.nodeName] = attaching + } + attaching[chosen] = volumeID glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID) return chosen, false, nil } -func (i *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) { - i.mutex.Lock() - defer i.mutex.Unlock() +// endAttaching removes the entry from the "attachments in progress" map +// It returns true if it was found (and removed), false otherwise +func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool { + c.attachingMutex.Lock() + defer c.attachingMutex.Unlock() - existingVolumeID, found := i.attaching[mountDevice] + existingVolumeID, found := c.attaching[i.nodeName][mountDevice] if !found { - glog.Errorf("endAttaching on non-allocated device") - return + return false } if volumeID != existingVolumeID { glog.Errorf("endAttaching on device assigned to different volume") - return + return false } - glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID) - delete(i.attaching, mountDevice) + glog.V(2).Infof("Releasing in-process attachment entry: %s -> volume %s", mountDevice, volumeID) + delete(c.attaching[i.nodeName], mountDevice) + return true } type awsDisk struct { @@ -1382,7 +1392,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) return "", errors.New("AWS volumes cannot be mounted read-only") } - mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true) + mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, true) if err != nil { return "", err } @@ -1394,11 +1404,13 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) ec2Device := "/dev/xvd" + string(mountDevice) // attachEnded is set to true if the attach operation completed - // (successfully or not) + // (successfully or not), and is thus no longer in progress attachEnded := false defer func() { if attachEnded { - awsInstance.endAttaching(disk.awsID, mountDevice) + if !c.endAttaching(awsInstance, disk.awsID, mountDevice) { + glog.Errorf("endAttaching called when attach not in progress") + } } }() @@ -1424,6 +1436,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) return "", err } + // The attach operation has finished attachEnded = true return hostDevice, nil @@ -1450,14 +1463,14 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) return "", err } - mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false) + mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, false) if err != nil { return "", err } if !alreadyAttached { glog.Warningf("DetachDisk called on non-attached disk: %s", diskName) - // TODO: Continue? Tolerate non-attached error in DetachVolume? + // TODO: Continue? Tolerate non-attached error from the AWS DetachVolume call? } request := ec2.DetachVolumeInput{ @@ -1479,7 +1492,9 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) } if mountDevice != "" { - awsInstance.endAttaching(disk.awsID, mountDevice) + c.endAttaching(awsInstance, disk.awsID, mountDevice) + // We don't check the return value - we don't really expect the attachment to have been + // in progress, though it might have been } hostDevicePath := "/dev/xvd" + string(mountDevice)