mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	Merge pull request #19335 from justinsb/aws_delay_when_requestlimitexceeded
Auto commit by PR queue bot
This commit is contained in:
		@@ -228,13 +228,57 @@ type awsSdkEC2 struct {
 | 
			
		||||
 | 
			
		||||
type awsSDKProvider struct {
 | 
			
		||||
	creds *credentials.Credentials
 | 
			
		||||
 | 
			
		||||
	mutex          sync.Mutex
 | 
			
		||||
	regionDelayers map[string]*CrossRequestRetryDelay
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func addHandlers(h *request.Handlers) {
 | 
			
		||||
func newAWSSDKProvider(creds *credentials.Credentials) *awsSDKProvider {
 | 
			
		||||
	return &awsSDKProvider{
 | 
			
		||||
		creds:          creds,
 | 
			
		||||
		regionDelayers: make(map[string]*CrossRequestRetryDelay),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) {
 | 
			
		||||
	h.Sign.PushFrontNamed(request.NamedHandler{
 | 
			
		||||
		Name: "k8s/logger",
 | 
			
		||||
		Fn:   awsHandlerLogger,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	delayer := p.getCrossRequestRetryDelay(regionName)
 | 
			
		||||
	if delayer != nil {
 | 
			
		||||
		h.Sign.PushFrontNamed(request.NamedHandler{
 | 
			
		||||
			Name: "k8s/delay-presign",
 | 
			
		||||
			Fn:   delayer.BeforeSign,
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
		h.AfterRetry.PushFrontNamed(request.NamedHandler{
 | 
			
		||||
			Name: "k8s/delay-afterretry",
 | 
			
		||||
			Fn:   delayer.AfterRetry,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get a CrossRequestRetryDelay, scoped to the region, not to the request.
 | 
			
		||||
// This means that when we hit a limit on a call, we will delay _all_ calls to the API.
 | 
			
		||||
// We do this to protect the AWS account from becoming overloaded and effectively locked.
 | 
			
		||||
// We also log when we hit request limits.
 | 
			
		||||
// Note that this delays the current goroutine; this is bad behaviour and will
 | 
			
		||||
// likely cause k8s to become slow or unresponsive for cloud operations.
 | 
			
		||||
// However, this throttle is intended only as a last resort.  When we observe
 | 
			
		||||
// this throttling, we need to address the root cause (e.g. add a delay to a
 | 
			
		||||
// controller retry loop)
 | 
			
		||||
func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay {
 | 
			
		||||
	p.mutex.Lock()
 | 
			
		||||
	defer p.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	delayer, found := p.regionDelayers[regionName]
 | 
			
		||||
	if !found {
 | 
			
		||||
		delayer = NewCrossRequestRetryDelay()
 | 
			
		||||
		p.regionDelayers[regionName] = delayer
 | 
			
		||||
	}
 | 
			
		||||
	return delayer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
 | 
			
		||||
@@ -243,7 +287,7 @@ func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
 | 
			
		||||
		Credentials: p.creds,
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	addHandlers(&service.Handlers)
 | 
			
		||||
	p.addHandlers(regionName, &service.Handlers)
 | 
			
		||||
 | 
			
		||||
	ec2 := &awsSdkEC2{
 | 
			
		||||
		ec2: service,
 | 
			
		||||
@@ -257,7 +301,7 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) {
 | 
			
		||||
		Credentials: p.creds,
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	addHandlers(&elbClient.Handlers)
 | 
			
		||||
	p.addHandlers(regionName, &elbClient.Handlers)
 | 
			
		||||
 | 
			
		||||
	return elbClient, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -268,7 +312,7 @@ func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) {
 | 
			
		||||
		Credentials: p.creds,
 | 
			
		||||
	}))
 | 
			
		||||
 | 
			
		||||
	addHandlers(&client.Handlers)
 | 
			
		||||
	p.addHandlers(regionName, &client.Handlers)
 | 
			
		||||
 | 
			
		||||
	return client, nil
 | 
			
		||||
}
 | 
			
		||||
@@ -458,7 +502,7 @@ func init() {
 | 
			
		||||
				},
 | 
			
		||||
				&credentials.SharedCredentialsProvider{},
 | 
			
		||||
			})
 | 
			
		||||
		aws := &awsSDKProvider{creds: creds}
 | 
			
		||||
		aws := newAWSSDKProvider(creds)
 | 
			
		||||
		return newAWSCloud(config, aws)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										161
									
								
								pkg/cloudprovider/providers/aws/retry_handler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										161
									
								
								pkg/cloudprovider/providers/aws/retry_handler.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,161 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package aws
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/aws/aws-sdk-go/aws/awserr"
 | 
			
		||||
	"github.com/aws/aws-sdk-go/aws/request"
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	decayIntervalSeconds = 20
 | 
			
		||||
	decayFraction        = 0.8
 | 
			
		||||
	maxDelay             = 60 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// CrossRequestRetryDelay inserts delays before AWS calls, when we are observing RequestLimitExceeded errors
 | 
			
		||||
// Note that we share a CrossRequestRetryDelay across multiple AWS requests; this is a process-wide back-off,
 | 
			
		||||
// whereas the aws-sdk-go implements a per-request exponential backoff/retry
 | 
			
		||||
type CrossRequestRetryDelay struct {
 | 
			
		||||
	backoff Backoff
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Create a new CrossRequestRetryDelay
 | 
			
		||||
func NewCrossRequestRetryDelay() *CrossRequestRetryDelay {
 | 
			
		||||
	c := &CrossRequestRetryDelay{}
 | 
			
		||||
	c.backoff.init(decayIntervalSeconds, decayFraction, maxDelay)
 | 
			
		||||
	return c
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Added to the Sign chain; called before each request
 | 
			
		||||
func (c *CrossRequestRetryDelay) BeforeSign(r *request.Request) {
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	delay := c.backoff.ComputeDelayForRequest(now)
 | 
			
		||||
	if delay > 0 {
 | 
			
		||||
		glog.Warningf("Inserting delay before AWS request (%s) to avoid RequestLimitExceeded: %s",
 | 
			
		||||
			describeRequest(r), delay.String())
 | 
			
		||||
		r.Config.SleepDelay(delay)
 | 
			
		||||
 | 
			
		||||
		// Avoid clock skew problems
 | 
			
		||||
		r.Time = now
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Return a user-friendly string describing the request, for use in log messages
 | 
			
		||||
func describeRequest(r *request.Request) string {
 | 
			
		||||
	service := r.ClientInfo.ServiceName
 | 
			
		||||
 | 
			
		||||
	name := "?"
 | 
			
		||||
	if r.Operation != nil {
 | 
			
		||||
		name = r.Operation.Name
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return service + "::" + name
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Added to the AfterRetry chain; called after any error
 | 
			
		||||
func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) {
 | 
			
		||||
	if r.Error == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	awsError, ok := r.Error.(awserr.Error)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if awsError.Code() == "RequestLimitExceeded" {
 | 
			
		||||
		c.backoff.ReportError()
 | 
			
		||||
		glog.Warningf("Got RequestLimitExceeded error on AWS request (%s)",
 | 
			
		||||
			describeRequest(r))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Backoff manages a backoff that varies based on the recently observed failures
 | 
			
		||||
type Backoff struct {
 | 
			
		||||
	decayIntervalSeconds int64
 | 
			
		||||
	decayFraction        float64
 | 
			
		||||
	maxDelay             time.Duration
 | 
			
		||||
 | 
			
		||||
	mutex sync.Mutex
 | 
			
		||||
 | 
			
		||||
	// We count all requests & the number of requests which hit a
 | 
			
		||||
	// RequestLimit.  We only really care about 'recent' requests, so we
 | 
			
		||||
	// decay the counts exponentially to bias towards recent values.
 | 
			
		||||
	countErrorsRequestLimit float32
 | 
			
		||||
	countRequests           float32
 | 
			
		||||
	lastDecay               int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *Backoff) init(decayIntervalSeconds int, decayFraction float64, maxDelay time.Duration) {
 | 
			
		||||
	b.lastDecay = time.Now().Unix()
 | 
			
		||||
	// Bias so that if the first request hits the limit we don't immediately apply the full delay
 | 
			
		||||
	b.countRequests = 4
 | 
			
		||||
	b.decayIntervalSeconds = int64(decayIntervalSeconds)
 | 
			
		||||
	b.decayFraction = decayFraction
 | 
			
		||||
	b.maxDelay = maxDelay
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Computes the delay required for a request, also updating internal state to count this request
 | 
			
		||||
func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration {
 | 
			
		||||
	b.mutex.Lock()
 | 
			
		||||
	defer b.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	// Apply exponential decay to the counters
 | 
			
		||||
	timeDeltaSeconds := now.Unix() - b.lastDecay
 | 
			
		||||
	if timeDeltaSeconds > b.decayIntervalSeconds {
 | 
			
		||||
		intervals := float64(timeDeltaSeconds) / float64(b.decayIntervalSeconds)
 | 
			
		||||
		decay := float32(math.Pow(b.decayFraction, intervals))
 | 
			
		||||
		b.countErrorsRequestLimit *= decay
 | 
			
		||||
		b.countRequests *= decay
 | 
			
		||||
		b.lastDecay = now.Unix()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Count this request
 | 
			
		||||
	b.countRequests += 1.0
 | 
			
		||||
 | 
			
		||||
	// Compute the failure rate
 | 
			
		||||
	errorFraction := float32(0.0)
 | 
			
		||||
	if b.countRequests > 0.5 {
 | 
			
		||||
		// Avoid tiny residuals & rounding errors
 | 
			
		||||
		errorFraction = b.countErrorsRequestLimit / b.countRequests
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Ignore a low fraction of errors
 | 
			
		||||
	// This also allows them to time-out
 | 
			
		||||
	if errorFraction < 0.1 {
 | 
			
		||||
		return time.Duration(0)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Delay by the max delay multiplied by the recent error rate
 | 
			
		||||
	// (i.e. we apply a linear delay function)
 | 
			
		||||
	// TODO: This is pretty arbitrary
 | 
			
		||||
	delay := time.Nanosecond * time.Duration(float32(b.maxDelay.Nanoseconds())*errorFraction)
 | 
			
		||||
	// Round down to the nearest second for sanity
 | 
			
		||||
	return time.Second * time.Duration(int(delay.Seconds()))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Called when we observe a throttling error
 | 
			
		||||
func (b *Backoff) ReportError() {
 | 
			
		||||
	b.mutex.Lock()
 | 
			
		||||
	defer b.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	b.countErrorsRequestLimit += 1.0
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										135
									
								
								pkg/cloudprovider/providers/aws/retry_handler_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										135
									
								
								pkg/cloudprovider/providers/aws/retry_handler_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,135 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2016 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package aws
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// There follows a group of tests for the backoff logic.  There's nothing
 | 
			
		||||
// particularly special about the values chosen: if we tweak the values in the
 | 
			
		||||
// backoff logic then we might well have to update the tests.  However the key
 | 
			
		||||
// behavioural elements should remain (e.g. no errors => no backoff), and these
 | 
			
		||||
// are each tested by one of the tests below.
 | 
			
		||||
 | 
			
		||||
// Test that we don't apply any delays when there are no errors
 | 
			
		||||
func TestBackoffNoErrors(t *testing.T) {
 | 
			
		||||
	b := &Backoff{}
 | 
			
		||||
	b.init(decayIntervalSeconds, decayFraction, maxDelay)
 | 
			
		||||
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		d := b.ComputeDelayForRequest(now)
 | 
			
		||||
		if d.Nanoseconds() != 0 {
 | 
			
		||||
			t.Fatalf("unexpected delay during no-error case")
 | 
			
		||||
		}
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Test that we always apply a delay when there are errors, and also that we
 | 
			
		||||
// don't "flap" - that our own delay doesn't cause us to oscillate between
 | 
			
		||||
// delay and no-delay.
 | 
			
		||||
func TestBackoffAllErrors(t *testing.T) {
 | 
			
		||||
	b := &Backoff{}
 | 
			
		||||
	b.init(decayIntervalSeconds, decayFraction, maxDelay)
 | 
			
		||||
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	// Warm up
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		_ = b.ComputeDelayForRequest(now)
 | 
			
		||||
		b.ReportError()
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		d := b.ComputeDelayForRequest(now)
 | 
			
		||||
		b.ReportError()
 | 
			
		||||
		if d.Seconds() < 5 {
 | 
			
		||||
			t.Fatalf("unexpected short-delay during all-error case: %v", d)
 | 
			
		||||
		}
 | 
			
		||||
		t.Logf("delay @%d %v", i, d)
 | 
			
		||||
		now = now.Add(d)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Test that we do come close to our max delay, when we see all errors at 1
 | 
			
		||||
// second intervals (this simulates multiple concurrent requests, because we
 | 
			
		||||
// don't wait for delay in between requests)
 | 
			
		||||
func TestBackoffHitsMax(t *testing.T) {
 | 
			
		||||
	b := &Backoff{}
 | 
			
		||||
	b.init(decayIntervalSeconds, decayFraction, maxDelay)
 | 
			
		||||
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		_ = b.ComputeDelayForRequest(now)
 | 
			
		||||
		b.ReportError()
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		d := b.ComputeDelayForRequest(now)
 | 
			
		||||
		b.ReportError()
 | 
			
		||||
		if float32(d.Nanoseconds()) < (float32(maxDelay.Nanoseconds()) * 0.95) {
 | 
			
		||||
			t.Fatalf("expected delay to be >= 95 percent of max delay, was %v", d)
 | 
			
		||||
		}
 | 
			
		||||
		t.Logf("delay @%d %v", i, d)
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Test that after a phase of errors, we eventually stop applying a delay once there are
 | 
			
		||||
// no more errors.
 | 
			
		||||
func TestBackoffRecovers(t *testing.T) {
 | 
			
		||||
	b := &Backoff{}
 | 
			
		||||
	b.init(decayIntervalSeconds, decayFraction, maxDelay)
 | 
			
		||||
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
 | 
			
		||||
	// Phase of all-errors
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		_ = b.ComputeDelayForRequest(now)
 | 
			
		||||
		b.ReportError()
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		d := b.ComputeDelayForRequest(now)
 | 
			
		||||
		b.ReportError()
 | 
			
		||||
		if d.Seconds() < 5 {
 | 
			
		||||
			t.Fatalf("unexpected short-delay during all-error phase: %v", d)
 | 
			
		||||
		}
 | 
			
		||||
		t.Logf("error phase delay @%d %v", i, d)
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Phase of no errors
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		_ = b.ComputeDelayForRequest(now)
 | 
			
		||||
		now = now.Add(3 * time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		d := b.ComputeDelayForRequest(now)
 | 
			
		||||
		if d.Seconds() != 0 {
 | 
			
		||||
			t.Fatalf("unexpected delay during error recovery phase: %v", d)
 | 
			
		||||
		}
 | 
			
		||||
		t.Logf("no-error phase delay @%d %v", i, d)
 | 
			
		||||
		now = now.Add(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user