AWS: move volume attachment map to cloud level

The problem is that attachments are now done on the master, and we are
only caching the attachment map persistently for the local instance.  So
there is now a race, because the attachment map is cleared every time.

Issue #29324
This commit is contained in:
Justin Santa Barbara 2016-08-21 23:20:12 -04:00
parent 98c4029275
commit 81240da858

View File

@ -298,6 +298,12 @@ type Cloud struct {
mutex sync.Mutex mutex sync.Mutex
lastNodeNames sets.String lastNodeNames sets.String
lastInstancesByNodeNames []*ec2.Instance lastInstancesByNodeNames []*ec2.Instance
// We keep an active list of devices we have assigned but not yet
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attachingMutex sync.Mutex
attaching map[ /*nodeName*/ string]map[mountDevice]string
} }
var _ Volumes = &Cloud{} var _ Volumes = &Cloud{}
@ -675,6 +681,10 @@ func azToRegion(az string) (string, error) {
// newAWSCloud creates a new instance of AWSCloud. // newAWSCloud creates a new instance of AWSCloud.
// AWSProvider and instanceId are primarily for tests // AWSProvider and instanceId are primarily for tests
func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
// We have some state in the Cloud object - in particular the attaching map
// Log so that if we are building multiple Cloud objects, it is obvious!
glog.Infof("Building AWS cloudprovider")
metadata, err := awsServices.Metadata() metadata, err := awsServices.Metadata()
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating AWS metadata client: %v", err) return nil, fmt.Errorf("error creating AWS metadata client: %v", err)
@ -725,6 +735,8 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
metadata: metadata, metadata: metadata,
cfg: cfg, cfg: cfg,
region: regionName, region: regionName,
attaching: make(map[string]map[mountDevice]string),
} }
selfAWSInstance, err := awsCloud.buildSelfAWSInstance() selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
@ -1044,13 +1056,6 @@ type awsInstance struct {
// instance type // instance type
instanceType string instanceType string
mutex sync.Mutex
// We keep an active list of devices we have assigned but not yet
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attaching map[mountDevice]string
} }
// newAWSInstance creates a new awsInstance object // newAWSInstance creates a new awsInstance object
@ -1069,8 +1074,6 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance {
subnetID: aws.StringValue(instance.SubnetId), subnetID: aws.StringValue(instance.SubnetId),
} }
self.attaching = make(map[mountDevice]string)
return self return self
} }
@ -1104,18 +1107,12 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice. // Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true. // If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false. // Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) { func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
instanceType := i.getInstanceType() instanceType := i.getInstanceType()
if instanceType == nil { if instanceType == nil {
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID) return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
} }
// We lock to prevent concurrent mounts from conflicting
// We may still conflict if someone calls the API concurrently,
// but the AWS API will then fail one of the two attach operations
i.mutex.Lock()
defer i.mutex.Unlock()
info, err := i.describeInstance() info, err := i.describeInstance()
if err != nil { if err != nil {
return "", false, err return "", false, err
@ -1135,7 +1132,13 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId) deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
} }
for mountDevice, volume := range i.attaching { // We lock to prevent concurrent mounts from conflicting
// We may still conflict if someone calls the API concurrently,
// but the AWS API will then fail one of the two attach operations
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()
for mountDevice, volume := range c.attaching[i.nodeName] {
deviceMappings[mountDevice] = volume deviceMappings[mountDevice] = volume
} }
@ -1170,27 +1173,34 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou
return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName) return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName)
} }
i.attaching[chosen] = volumeID attaching := c.attaching[i.nodeName]
if attaching == nil {
attaching = make(map[mountDevice]string)
c.attaching[i.nodeName] = attaching
}
attaching[chosen] = volumeID
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID) glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)
return chosen, false, nil return chosen, false, nil
} }
func (i *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) { // endAttaching removes the entry from the "attachments in progress" map
i.mutex.Lock() // It returns true if it was found (and removed), false otherwise
defer i.mutex.Unlock() func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool {
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()
existingVolumeID, found := i.attaching[mountDevice] existingVolumeID, found := c.attaching[i.nodeName][mountDevice]
if !found { if !found {
glog.Errorf("endAttaching on non-allocated device") return false
return
} }
if volumeID != existingVolumeID { if volumeID != existingVolumeID {
glog.Errorf("endAttaching on device assigned to different volume") glog.Errorf("endAttaching on device assigned to different volume")
return return false
} }
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID) glog.V(2).Infof("Releasing in-process attachment entry: %s -> volume %s", mountDevice, volumeID)
delete(i.attaching, mountDevice) delete(c.attaching[i.nodeName], mountDevice)
return true
} }
type awsDisk struct { type awsDisk struct {
@ -1382,7 +1392,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
return "", errors.New("AWS volumes cannot be mounted read-only") return "", errors.New("AWS volumes cannot be mounted read-only")
} }
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true) mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, true)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -1394,11 +1404,13 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
ec2Device := "/dev/xvd" + string(mountDevice) ec2Device := "/dev/xvd" + string(mountDevice)
// attachEnded is set to true if the attach operation completed // attachEnded is set to true if the attach operation completed
// (successfully or not) // (successfully or not), and is thus no longer in progress
attachEnded := false attachEnded := false
defer func() { defer func() {
if attachEnded { if attachEnded {
awsInstance.endAttaching(disk.awsID, mountDevice) if !c.endAttaching(awsInstance, disk.awsID, mountDevice) {
glog.Errorf("endAttaching called when attach not in progress")
}
} }
}() }()
@ -1424,6 +1436,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
return "", err return "", err
} }
// The attach operation has finished
attachEnded = true attachEnded = true
return hostDevice, nil return hostDevice, nil
@ -1450,14 +1463,14 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
return "", err return "", err
} }
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false) mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, false)
if err != nil { if err != nil {
return "", err return "", err
} }
if !alreadyAttached { if !alreadyAttached {
glog.Warningf("DetachDisk called on non-attached disk: %s", diskName) glog.Warningf("DetachDisk called on non-attached disk: %s", diskName)
// TODO: Continue? Tolerate non-attached error in DetachVolume? // TODO: Continue? Tolerate non-attached error from the AWS DetachVolume call?
} }
request := ec2.DetachVolumeInput{ request := ec2.DetachVolumeInput{
@ -1479,7 +1492,9 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
} }
if mountDevice != "" { if mountDevice != "" {
awsInstance.endAttaching(disk.awsID, mountDevice) c.endAttaching(awsInstance, disk.awsID, mountDevice)
// We don't check the return value - we don't really expect the attachment to have been
// in progress, though it might have been
} }
hostDevicePath := "/dev/xvd" + string(mountDevice) hostDevicePath := "/dev/xvd" + string(mountDevice)