mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
Merge pull request #31090 from justinsb/fix_29324
Automatic merge from submit-queue AWS: fix volume device assignment race condition * Move volume attachment map to cloud level * Perform sanity check after volume attach, to double-check everything is right
This commit is contained in:
commit
130051b2d9
@ -344,6 +344,12 @@ type Cloud struct {
|
|||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
lastNodeNames sets.String
|
lastNodeNames sets.String
|
||||||
lastInstancesByNodeNames []*ec2.Instance
|
lastInstancesByNodeNames []*ec2.Instance
|
||||||
|
|
||||||
|
// We keep an active list of devices we have assigned but not yet
|
||||||
|
// attached, to avoid a race condition where we assign a device mapping
|
||||||
|
// and then get a second request before we attach the volume
|
||||||
|
attachingMutex sync.Mutex
|
||||||
|
attaching map[ /*nodeName*/ string]map[mountDevice]string
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Volumes = &Cloud{}
|
var _ Volumes = &Cloud{}
|
||||||
@ -721,6 +727,10 @@ func azToRegion(az string) (string, error) {
|
|||||||
// newAWSCloud creates a new instance of AWSCloud.
|
// newAWSCloud creates a new instance of AWSCloud.
|
||||||
// AWSProvider and instanceId are primarily for tests
|
// AWSProvider and instanceId are primarily for tests
|
||||||
func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
|
func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
|
||||||
|
// We have some state in the Cloud object - in particular the attaching map
|
||||||
|
// Log so that if we are building multiple Cloud objects, it is obvious!
|
||||||
|
glog.Infof("Building AWS cloudprovider")
|
||||||
|
|
||||||
metadata, err := awsServices.Metadata()
|
metadata, err := awsServices.Metadata()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating AWS metadata client: %v", err)
|
return nil, fmt.Errorf("error creating AWS metadata client: %v", err)
|
||||||
@ -771,6 +781,8 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
|
|||||||
metadata: metadata,
|
metadata: metadata,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
region: regionName,
|
region: regionName,
|
||||||
|
|
||||||
|
attaching: make(map[string]map[mountDevice]string),
|
||||||
}
|
}
|
||||||
|
|
||||||
selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
|
selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
|
||||||
@ -1090,13 +1102,6 @@ type awsInstance struct {
|
|||||||
|
|
||||||
// instance type
|
// instance type
|
||||||
instanceType string
|
instanceType string
|
||||||
|
|
||||||
mutex sync.Mutex
|
|
||||||
|
|
||||||
// We keep an active list of devices we have assigned but not yet
|
|
||||||
// attached, to avoid a race condition where we assign a device mapping
|
|
||||||
// and then get a second request before we attach the volume
|
|
||||||
attaching map[mountDevice]string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newAWSInstance creates a new awsInstance object
|
// newAWSInstance creates a new awsInstance object
|
||||||
@ -1115,8 +1120,6 @@ func newAWSInstance(ec2Service EC2, instance *ec2.Instance) *awsInstance {
|
|||||||
subnetID: aws.StringValue(instance.SubnetId),
|
subnetID: aws.StringValue(instance.SubnetId),
|
||||||
}
|
}
|
||||||
|
|
||||||
self.attaching = make(map[mountDevice]string)
|
|
||||||
|
|
||||||
return self
|
return self
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1150,18 +1153,12 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
|
|||||||
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
|
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
|
||||||
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
|
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
|
||||||
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
|
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
|
||||||
func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
|
func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
|
||||||
instanceType := i.getInstanceType()
|
instanceType := i.getInstanceType()
|
||||||
if instanceType == nil {
|
if instanceType == nil {
|
||||||
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
|
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We lock to prevent concurrent mounts from conflicting
|
|
||||||
// We may still conflict if someone calls the API concurrently,
|
|
||||||
// but the AWS API will then fail one of the two attach operations
|
|
||||||
i.mutex.Lock()
|
|
||||||
defer i.mutex.Unlock()
|
|
||||||
|
|
||||||
info, err := i.describeInstance()
|
info, err := i.describeInstance()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", false, err
|
return "", false, err
|
||||||
@ -1181,7 +1178,13 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou
|
|||||||
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
|
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
for mountDevice, volume := range i.attaching {
|
// We lock to prevent concurrent mounts from conflicting
|
||||||
|
// We may still conflict if someone calls the API concurrently,
|
||||||
|
// but the AWS API will then fail one of the two attach operations
|
||||||
|
c.attachingMutex.Lock()
|
||||||
|
defer c.attachingMutex.Unlock()
|
||||||
|
|
||||||
|
for mountDevice, volume := range c.attaching[i.nodeName] {
|
||||||
deviceMappings[mountDevice] = volume
|
deviceMappings[mountDevice] = volume
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1216,27 +1219,34 @@ func (i *awsInstance) getMountDevice(volumeID string, assign bool) (assigned mou
|
|||||||
return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName)
|
return "", false, fmt.Errorf("Too many EBS volumes attached to node %s.", i.nodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
i.attaching[chosen] = volumeID
|
attaching := c.attaching[i.nodeName]
|
||||||
|
if attaching == nil {
|
||||||
|
attaching = make(map[mountDevice]string)
|
||||||
|
c.attaching[i.nodeName] = attaching
|
||||||
|
}
|
||||||
|
attaching[chosen] = volumeID
|
||||||
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)
|
glog.V(2).Infof("Assigned mount device %s -> volume %s", chosen, volumeID)
|
||||||
|
|
||||||
return chosen, false, nil
|
return chosen, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *awsInstance) endAttaching(volumeID string, mountDevice mountDevice) {
|
// endAttaching removes the entry from the "attachments in progress" map
|
||||||
i.mutex.Lock()
|
// It returns true if it was found (and removed), false otherwise
|
||||||
defer i.mutex.Unlock()
|
func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool {
|
||||||
|
c.attachingMutex.Lock()
|
||||||
|
defer c.attachingMutex.Unlock()
|
||||||
|
|
||||||
existingVolumeID, found := i.attaching[mountDevice]
|
existingVolumeID, found := c.attaching[i.nodeName][mountDevice]
|
||||||
if !found {
|
if !found {
|
||||||
glog.Errorf("endAttaching on non-allocated device")
|
return false
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if volumeID != existingVolumeID {
|
if volumeID != existingVolumeID {
|
||||||
glog.Errorf("endAttaching on device assigned to different volume")
|
glog.Errorf("endAttaching on device assigned to different volume")
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
glog.V(2).Infof("Releasing mount device mapping: %s -> volume %s", mountDevice, volumeID)
|
glog.V(2).Infof("Releasing in-process attachment entry: %s -> volume %s", mountDevice, volumeID)
|
||||||
delete(i.attaching, mountDevice)
|
delete(c.attaching[i.nodeName], mountDevice)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
type awsDisk struct {
|
type awsDisk struct {
|
||||||
@ -1307,47 +1317,50 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// waitForAttachmentStatus polls until the attachment status is the expected value
|
// waitForAttachmentStatus polls until the attachment status is the expected value
|
||||||
// TODO(justinsb): return (bool, error)
|
// On success, it returns the last attachment state.
|
||||||
func (d *awsDisk) waitForAttachmentStatus(status string) error {
|
func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment, error) {
|
||||||
// TODO: There may be a faster way to get this when we're attaching locally
|
|
||||||
attempt := 0
|
attempt := 0
|
||||||
maxAttempts := 60
|
maxAttempts := 60
|
||||||
|
|
||||||
for {
|
for {
|
||||||
info, err := d.describeVolume()
|
info, err := d.describeVolume()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(info.Attachments) > 1 {
|
if len(info.Attachments) > 1 {
|
||||||
|
// Shouldn't happen; log so we know if it is
|
||||||
glog.Warningf("Found multiple attachments for volume: %v", info)
|
glog.Warningf("Found multiple attachments for volume: %v", info)
|
||||||
}
|
}
|
||||||
|
var attachment *ec2.VolumeAttachment
|
||||||
attachmentStatus := ""
|
attachmentStatus := ""
|
||||||
for _, attachment := range info.Attachments {
|
for _, a := range info.Attachments {
|
||||||
if attachmentStatus != "" {
|
if attachmentStatus != "" {
|
||||||
glog.Warning("Found multiple attachments: ", info)
|
// Shouldn't happen; log so we know if it is
|
||||||
|
glog.Warningf("Found multiple attachments: %v", info)
|
||||||
}
|
}
|
||||||
if attachment.State != nil {
|
if a.State != nil {
|
||||||
attachmentStatus = *attachment.State
|
attachment = a
|
||||||
|
attachmentStatus = *a.State
|
||||||
} else {
|
} else {
|
||||||
// Shouldn't happen, but don't panic...
|
// Shouldn't happen; log so we know if it is
|
||||||
glog.Warning("Ignoring nil attachment state: ", attachment)
|
glog.Warningf("Ignoring nil attachment state: %v", a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if attachmentStatus == "" {
|
if attachmentStatus == "" {
|
||||||
attachmentStatus = "detached"
|
attachmentStatus = "detached"
|
||||||
}
|
}
|
||||||
if attachmentStatus == status {
|
if attachmentStatus == status {
|
||||||
return nil
|
return attachment, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
|
||||||
|
|
||||||
attempt++
|
attempt++
|
||||||
if attempt > maxAttempts {
|
if attempt > maxAttempts {
|
||||||
glog.Warningf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
glog.Warningf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||||
return errors.New("Timeout waiting for volume state")
|
return nil, fmt.Errorf("Timeout waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
glog.V(2).Infof("Waiting for volume state: actual=%s, desired=%s", attachmentStatus, status)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1428,7 +1441,23 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
|
|||||||
return "", errors.New("AWS volumes cannot be mounted read-only")
|
return "", errors.New("AWS volumes cannot be mounted read-only")
|
||||||
}
|
}
|
||||||
|
|
||||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, true)
|
// mountDevice will hold the device where we should try to attach the disk
|
||||||
|
var mountDevice mountDevice
|
||||||
|
// alreadyAttached is true if we have already called AttachVolume on this disk
|
||||||
|
var alreadyAttached bool
|
||||||
|
|
||||||
|
// attachEnded is set to true if the attach operation completed
|
||||||
|
// (successfully or not), and is thus no longer in progress
|
||||||
|
attachEnded := false
|
||||||
|
defer func() {
|
||||||
|
if attachEnded {
|
||||||
|
if !c.endAttaching(awsInstance, disk.awsID, mountDevice) {
|
||||||
|
glog.Errorf("endAttaching called when attach not in progress")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
mountDevice, alreadyAttached, err = c.getMountDevice(awsInstance, disk.awsID, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -1439,15 +1468,6 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
|
|||||||
// See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html
|
// See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/device_naming.html
|
||||||
ec2Device := "/dev/xvd" + string(mountDevice)
|
ec2Device := "/dev/xvd" + string(mountDevice)
|
||||||
|
|
||||||
// attachEnded is set to true if the attach operation completed
|
|
||||||
// (successfully or not)
|
|
||||||
attachEnded := false
|
|
||||||
defer func() {
|
|
||||||
if attachEnded {
|
|
||||||
awsInstance.endAttaching(disk.awsID, mountDevice)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if !alreadyAttached {
|
if !alreadyAttached {
|
||||||
request := &ec2.AttachVolumeInput{
|
request := &ec2.AttachVolumeInput{
|
||||||
Device: aws.String(ec2Device),
|
Device: aws.String(ec2Device),
|
||||||
@ -1465,13 +1485,28 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
|
|||||||
glog.V(2).Infof("AttachVolume request returned %v", attachResponse)
|
glog.V(2).Infof("AttachVolume request returned %v", attachResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = disk.waitForAttachmentStatus("attached")
|
attachment, err := disk.waitForAttachmentStatus("attached")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The attach operation has finished
|
||||||
attachEnded = true
|
attachEnded = true
|
||||||
|
|
||||||
|
// Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
|
||||||
|
// It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call,
|
||||||
|
// which could theoretically be against a different device (or even instance).
|
||||||
|
if attachment == nil {
|
||||||
|
// Impossible?
|
||||||
|
return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", diskName, instanceName)
|
||||||
|
}
|
||||||
|
if ec2Device != aws.StringValue(attachment.Device) {
|
||||||
|
return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", diskName, instanceName, ec2Device, aws.StringValue(attachment.Device))
|
||||||
|
}
|
||||||
|
if awsInstance.awsID != aws.StringValue(attachment.InstanceId) {
|
||||||
|
return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", diskName, instanceName, awsInstance.awsID, aws.StringValue(attachment.InstanceId))
|
||||||
|
}
|
||||||
|
|
||||||
return hostDevice, nil
|
return hostDevice, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1496,14 +1531,14 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
mountDevice, alreadyAttached, err := awsInstance.getMountDevice(disk.awsID, false)
|
mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, disk.awsID, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !alreadyAttached {
|
if !alreadyAttached {
|
||||||
glog.Warningf("DetachDisk called on non-attached disk: %s", diskName)
|
glog.Warningf("DetachDisk called on non-attached disk: %s", diskName)
|
||||||
// TODO: Continue? Tolerate non-attached error in DetachVolume?
|
// TODO: Continue? Tolerate non-attached error from the AWS DetachVolume call?
|
||||||
}
|
}
|
||||||
|
|
||||||
request := ec2.DetachVolumeInput{
|
request := ec2.DetachVolumeInput{
|
||||||
@ -1519,13 +1554,19 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
|
|||||||
return "", errors.New("no response from DetachVolume")
|
return "", errors.New("no response from DetachVolume")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = disk.waitForAttachmentStatus("detached")
|
attachment, err := disk.waitForAttachmentStatus("detached")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
if attachment != nil {
|
||||||
|
// We expect it to be nil, it is (maybe) interesting if it is not
|
||||||
|
glog.V(2).Infof("waitForAttachmentStatus returned non-nil attachment with state=detached: %v", attachment)
|
||||||
|
}
|
||||||
|
|
||||||
if mountDevice != "" {
|
if mountDevice != "" {
|
||||||
awsInstance.endAttaching(disk.awsID, mountDevice)
|
c.endAttaching(awsInstance, disk.awsID, mountDevice)
|
||||||
|
// We don't check the return value - we don't really expect the attachment to have been
|
||||||
|
// in progress, though it might have been
|
||||||
}
|
}
|
||||||
|
|
||||||
hostDevicePath := "/dev/xvd" + string(mountDevice)
|
hostDevicePath := "/dev/xvd" + string(mountDevice)
|
||||||
|
Loading…
Reference in New Issue
Block a user