Merge pull request #38766 from jsafrane/wait-attach-backoff

Automatic merge from submit-queue

AWS: Add exponential backoff to waitForAttachmentStatus() and createTags()

We should use exponential backoff while waiting for a volume to get attached/detached to/from a node. This will lower AWS load and reduce API call throttling.

This partly fixes #33088

@justinsb, can you please take a look?
This commit is contained in:
Kubernetes Submit Queue 2017-01-05 03:08:04 -08:00 committed by GitHub
commit 6d0efbc9d6
2 changed files with 50 additions and 37 deletions

View File

@ -31,6 +31,7 @@ go_library(
"//pkg/credentialprovider/aws:go_default_library", "//pkg/credentialprovider/aws:go_default_library",
"//pkg/types:go_default_library", "//pkg/types:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor:github.com/aws/aws-sdk-go/aws", "//vendor:github.com/aws/aws-sdk-go/aws",
"//vendor:github.com/aws/aws-sdk-go/aws/awserr", "//vendor:github.com/aws/aws-sdk-go/aws/awserr",

View File

@ -48,6 +48,7 @@ import (
awscredentials "k8s.io/kubernetes/pkg/credentialprovider/aws" awscredentials "k8s.io/kubernetes/pkg/credentialprovider/aws"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
@ -136,16 +137,25 @@ const ServiceAnnotationLoadBalancerSSLPorts = "service.beta.kubernetes.io/aws-lo
const ServiceAnnotationLoadBalancerBEProtocol = "service.beta.kubernetes.io/aws-load-balancer-backend-protocol" const ServiceAnnotationLoadBalancerBEProtocol = "service.beta.kubernetes.io/aws-load-balancer-backend-protocol"
const ( const (
// volumeAttachmentStatusTimeout is the maximum time to wait for a volume attach/detach to complete
volumeAttachmentStatusTimeout = 30 * time.Minute
// volumeAttachmentConsecutiveErrorLimit is the number of consecutive errors we will ignore when waiting for a volume to attach/detach // volumeAttachmentConsecutiveErrorLimit is the number of consecutive errors we will ignore when waiting for a volume to attach/detach
volumeAttachmentStatusConsecutiveErrorLimit = 10 volumeAttachmentStatusConsecutiveErrorLimit = 10
// volumeAttachmentErrorDelay is the amount of time we wait before retrying after encountering an error, // volumeAttachmentStatus* is configuration of exponential backoff for
// while waiting for a volume attach/detach to complete // waiting for attach/detach operation to complete. Starting with 10
volumeAttachmentStatusErrorDelay = 20 * time.Second // seconds, multiplying by 1.2 with each step and taking 21 steps at maximum
// volumeAttachmentStatusPollInterval is the interval at which we poll the volume, // it will time out after 31.11 minutes, which roughly corresponds to GCE
// while waiting for a volume attach/detach to complete // timeout (30 minutes).
volumeAttachmentStatusPollInterval = 10 * time.Second volumeAttachmentStatusInitialDelay = 10 * time.Second
volumeAttachmentStatusFactor = 1.2
volumeAttachmentStatusSteps = 21
// createTag* is configuration of exponential backoff for CreateTag call. We
// retry mainly because if we create an object, we cannot tag it until it is
// "fully created" (eventual consistency). Starting with 1 second, doubling
// it every step and taking 9 steps results in 255 second total waiting
// time.
createTagInitialDelay = 1 * time.Second
createTagFactor = 2.0
createTagSteps = 9
) )
// Maps from backend protocol to ELB protocol // Maps from backend protocol to ELB protocol
@ -1304,25 +1314,28 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) {
// waitForAttachmentStatus polls until the attachment status is the expected value // waitForAttachmentStatus polls until the attachment status is the expected value
// On success, it returns the last attachment state. // On success, it returns the last attachment state.
func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment, error) { func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment, error) {
// We wait up to 30 minutes for the attachment to complete. backoff := wait.Backoff{
// This mirrors the GCE timeout. Duration: volumeAttachmentStatusInitialDelay,
timeoutAt := time.Now().UTC().Add(volumeAttachmentStatusTimeout).Unix() Factor: volumeAttachmentStatusFactor,
Steps: volumeAttachmentStatusSteps,
}
// Because of rate limiting, we often see errors from describeVolume // Because of rate limiting, we often see errors from describeVolume
// So we tolerate a limited number of failures. // So we tolerate a limited number of failures.
// But once we see more than 10 errors in a row, we return the error // But once we see more than 10 errors in a row, we return the error
describeErrorCount := 0 describeErrorCount := 0
var attachment *ec2.VolumeAttachment
for { err := wait.ExponentialBackoff(backoff, func() (bool, error) {
info, err := d.describeVolume() info, err := d.describeVolume()
if err != nil { if err != nil {
describeErrorCount++ describeErrorCount++
if describeErrorCount > volumeAttachmentStatusConsecutiveErrorLimit { if describeErrorCount > volumeAttachmentStatusConsecutiveErrorLimit {
return nil, err // report the error
return false, err
} else { } else {
glog.Warningf("Ignoring error from describe volume; will retry: %q", err) glog.Warningf("Ignoring error from describe volume; will retry: %q", err)
time.Sleep(volumeAttachmentStatusErrorDelay) return false, nil
continue
} }
} else { } else {
describeErrorCount = 0 describeErrorCount = 0
@ -1331,7 +1344,6 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
// Shouldn't happen; log so we know if it is // Shouldn't happen; log so we know if it is
glog.Warningf("Found multiple attachments for volume %q: %v", d.awsID, info) glog.Warningf("Found multiple attachments for volume %q: %v", d.awsID, info)
} }
var attachment *ec2.VolumeAttachment
attachmentStatus := "" attachmentStatus := ""
for _, a := range info.Attachments { for _, a := range info.Attachments {
if attachmentStatus != "" { if attachmentStatus != "" {
@ -1350,18 +1362,15 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
attachmentStatus = "detached" attachmentStatus = "detached"
} }
if attachmentStatus == status { if attachmentStatus == status {
return attachment, nil // Attachment is in requested state, finish waiting
return true, nil
} }
// continue waiting
if time.Now().Unix() > timeoutAt {
glog.Warningf("Timeout waiting for volume %q state: actual=%s, desired=%s", d.awsID, attachmentStatus, status)
return nil, fmt.Errorf("Timeout waiting for volume %q state: actual=%s, desired=%s", d.awsID, attachmentStatus, status)
}
glog.V(2).Infof("Waiting for volume %q state: actual=%s, desired=%s", d.awsID, attachmentStatus, status) glog.V(2).Infof("Waiting for volume %q state: actual=%s, desired=%s", d.awsID, attachmentStatus, status)
return false, nil
})
time.Sleep(volumeAttachmentStatusPollInterval) return attachment, err
}
} }
// Deletes the EBS disk // Deletes the EBS disk
@ -2242,30 +2251,33 @@ func (c *Cloud) createTags(resourceID string, tags map[string]string) error {
awsTags = append(awsTags, tag) awsTags = append(awsTags, tag)
} }
backoff := wait.Backoff{
Duration: createTagInitialDelay,
Factor: createTagFactor,
Steps: createTagSteps,
}
request := &ec2.CreateTagsInput{} request := &ec2.CreateTagsInput{}
request.Resources = []*string{&resourceID} request.Resources = []*string{&resourceID}
request.Tags = awsTags request.Tags = awsTags
// TODO: We really should do exponential backoff here var lastErr error
attempt := 0 err := wait.ExponentialBackoff(backoff, func() (bool, error) {
maxAttempts := 60
for {
_, err := c.ec2.CreateTags(request) _, err := c.ec2.CreateTags(request)
if err == nil { if err == nil {
return nil return true, nil
} }
// We could check that the error is retryable, but the error code changes based on what we are tagging // We could check that the error is retryable, but the error code changes based on what we are tagging
// SecurityGroup: InvalidGroup.NotFound // SecurityGroup: InvalidGroup.NotFound
attempt++
if attempt > maxAttempts {
glog.Warningf("Failed to create tags (too many attempts): %v", err)
return err
}
glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err) glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err)
time.Sleep(1 * time.Second) lastErr = err
return false, nil
})
if err == wait.ErrWaitTimeout {
// return real CreateTags error instead of timeout
err = lastErr
} }
return err
} }
// Finds the value for a given tag. // Finds the value for a given tag.