mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
AWS: Delay all AWS calls when we observe RequestLimitExceeded errors
This applies a cross-request time delay when we observe RequestLimitExceeded errors, unlike the default library behaviour which only applies a *per-request* backoff. Issue #12121
This commit is contained in:
parent
f366baeaeb
commit
b269e8f43c
@ -228,13 +228,57 @@ type awsSdkEC2 struct {
|
|||||||
|
|
||||||
type awsSDKProvider struct {
|
type awsSDKProvider struct {
|
||||||
creds *credentials.Credentials
|
creds *credentials.Credentials
|
||||||
|
|
||||||
|
mutex sync.Mutex
|
||||||
|
regionDelayers map[string]*CrossRequestRetryDelay
|
||||||
}
|
}
|
||||||
|
|
||||||
func addHandlers(h *request.Handlers) {
|
func newAWSSDKProvider(creds *credentials.Credentials) *awsSDKProvider {
|
||||||
|
return &awsSDKProvider{
|
||||||
|
creds: creds,
|
||||||
|
regionDelayers: make(map[string]*CrossRequestRetryDelay),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) {
|
||||||
h.Sign.PushFrontNamed(request.NamedHandler{
|
h.Sign.PushFrontNamed(request.NamedHandler{
|
||||||
Name: "k8s/logger",
|
Name: "k8s/logger",
|
||||||
Fn: awsHandlerLogger,
|
Fn: awsHandlerLogger,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
delayer := p.getCrossRequestRetryDelay(regionName)
|
||||||
|
if delayer != nil {
|
||||||
|
h.Sign.PushFrontNamed(request.NamedHandler{
|
||||||
|
Name: "k8s/delay-presign",
|
||||||
|
Fn: delayer.BeforeSign,
|
||||||
|
})
|
||||||
|
|
||||||
|
h.AfterRetry.PushFrontNamed(request.NamedHandler{
|
||||||
|
Name: "k8s/delay-afterretry",
|
||||||
|
Fn: delayer.AfterRetry,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a CrossRequestRetryDelay, scoped to the region, not to the request.
|
||||||
|
// This means that when we hit a limit on a call, we will delay _all_ calls to the API.
|
||||||
|
// We do this to protect the AWS account from becoming overloaded and effectively locked.
|
||||||
|
// We also log when we hit request limits.
|
||||||
|
// Note that this delays the current goroutine; this is bad behaviour and will
|
||||||
|
// likely cause k8s to become slow or unresponsive for cloud operations.
|
||||||
|
// However, this throttle is intended only as a last resort. When we observe
|
||||||
|
// this throttling, we need to address the root cause (e.g. add a delay to a
|
||||||
|
// controller retry loop)
|
||||||
|
func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay {
|
||||||
|
p.mutex.Lock()
|
||||||
|
defer p.mutex.Unlock()
|
||||||
|
|
||||||
|
delayer, found := p.regionDelayers[regionName]
|
||||||
|
if !found {
|
||||||
|
delayer = NewCrossRequestRetryDelay()
|
||||||
|
p.regionDelayers[regionName] = delayer
|
||||||
|
}
|
||||||
|
return delayer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
|
func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
|
||||||
@ -243,7 +287,7 @@ func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
|
|||||||
Credentials: p.creds,
|
Credentials: p.creds,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
addHandlers(&service.Handlers)
|
p.addHandlers(regionName, &service.Handlers)
|
||||||
|
|
||||||
ec2 := &awsSdkEC2{
|
ec2 := &awsSdkEC2{
|
||||||
ec2: service,
|
ec2: service,
|
||||||
@ -257,7 +301,7 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) {
|
|||||||
Credentials: p.creds,
|
Credentials: p.creds,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
addHandlers(&elbClient.Handlers)
|
p.addHandlers(regionName, &elbClient.Handlers)
|
||||||
|
|
||||||
return elbClient, nil
|
return elbClient, nil
|
||||||
}
|
}
|
||||||
@ -268,7 +312,7 @@ func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) {
|
|||||||
Credentials: p.creds,
|
Credentials: p.creds,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
addHandlers(&client.Handlers)
|
p.addHandlers(regionName, &client.Handlers)
|
||||||
|
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
@ -458,7 +502,7 @@ func init() {
|
|||||||
},
|
},
|
||||||
&credentials.SharedCredentialsProvider{},
|
&credentials.SharedCredentialsProvider{},
|
||||||
})
|
})
|
||||||
aws := &awsSDKProvider{creds: creds}
|
aws := newAWSSDKProvider(creds)
|
||||||
return newAWSCloud(config, aws)
|
return newAWSCloud(config, aws)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
154
pkg/cloudprovider/providers/aws/retry_handler.go
Normal file
154
pkg/cloudprovider/providers/aws/retry_handler.go
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package aws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
decayIntervalSeconds = 20
|
||||||
|
decayFraction = 0.8
|
||||||
|
maxDelay = 60 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// CrossRequestRetryDelay inserts delays before AWS calls, when we are observing RequestLimitExceeded errors
|
||||||
|
// Note that we share a CrossRequestRetryDelay across multiple AWS requests; this is a process-wide back-off,
|
||||||
|
// whereas the aws-sdk-go implements a per-request exponential backoff/retry
|
||||||
|
type CrossRequestRetryDelay struct {
|
||||||
|
backoff Backoff
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new CrossRequestRetryDelay
|
||||||
|
func NewCrossRequestRetryDelay() *CrossRequestRetryDelay {
|
||||||
|
c := &CrossRequestRetryDelay{}
|
||||||
|
c.backoff.init()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Added to the Sign chain; called before each request
|
||||||
|
func (c *CrossRequestRetryDelay) BeforeSign(r *request.Request) {
|
||||||
|
now := time.Now()
|
||||||
|
delay := c.backoff.ComputeDelayForRequest(now)
|
||||||
|
if delay > 0 {
|
||||||
|
glog.Warningf("Inserting delay before AWS request (%s) to avoid RequestLimitExceeded: %s",
|
||||||
|
describeRequest(r), delay.String())
|
||||||
|
r.Config.SleepDelay(delay)
|
||||||
|
|
||||||
|
// Avoid clock skew problems
|
||||||
|
r.Time = now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a user-friendly string describing the request, for use in log messages
|
||||||
|
func describeRequest(r *request.Request) string {
|
||||||
|
service := r.ClientInfo.ServiceName
|
||||||
|
|
||||||
|
name := "?"
|
||||||
|
if r.Operation != nil {
|
||||||
|
name = r.Operation.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
return service + "::" + name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Added to the AfterRetry chain; called after any error
|
||||||
|
func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) {
|
||||||
|
if r.Error == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
awsError, ok := r.Error.(awserr.Error)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if awsError.Code() == "RequestLimitExceeded" {
|
||||||
|
c.backoff.ReportError()
|
||||||
|
glog.Warningf("Got RequestLimitExceeded error on AWS request (%s)",
|
||||||
|
describeRequest(r))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backoff manages a backoff that varies based on the recently observed failures
|
||||||
|
type Backoff struct {
|
||||||
|
mutex sync.Mutex
|
||||||
|
|
||||||
|
// We count all requests & the number of requests which hit a
|
||||||
|
// RequestLimit. We only really care about 'recent' requests, so we
|
||||||
|
// decay the counts exponentially to bias towards recent values.
|
||||||
|
countErrorsRequestLimit float32
|
||||||
|
countRequests float32
|
||||||
|
lastDecay int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Backoff) init() {
|
||||||
|
b.lastDecay = time.Now().Unix()
|
||||||
|
// Bias so that if the first request hits the limit we don't immediately apply the full delay
|
||||||
|
b.countRequests = 4
|
||||||
|
}
|
||||||
|
|
||||||
|
// Computes the delay required for a request, also updating internal state to count this request
|
||||||
|
func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration {
|
||||||
|
b.mutex.Lock()
|
||||||
|
defer b.mutex.Unlock()
|
||||||
|
|
||||||
|
// Apply exponential decay to the counters
|
||||||
|
timeDeltaSeconds := now.Unix() - b.lastDecay
|
||||||
|
if timeDeltaSeconds > decayIntervalSeconds {
|
||||||
|
intervals := float64(timeDeltaSeconds) / float64(decayIntervalSeconds)
|
||||||
|
decay := float32(math.Pow(decayFraction, intervals))
|
||||||
|
b.countErrorsRequestLimit *= decay
|
||||||
|
b.countRequests *= decay
|
||||||
|
b.lastDecay = now.Unix()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count this request
|
||||||
|
b.countRequests += 1.0
|
||||||
|
|
||||||
|
// Compute the failure rate
|
||||||
|
errorFraction := float32(0.0)
|
||||||
|
if b.countRequests > 0.5 {
|
||||||
|
// Avoid tiny residuals & rounding errors
|
||||||
|
errorFraction = b.countErrorsRequestLimit / b.countRequests
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ignore a low fraction of errors
|
||||||
|
// This also allows them to time-out
|
||||||
|
if errorFraction < 0.1 {
|
||||||
|
return time.Duration(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delay by the max delay multiplied by the recent error rate
|
||||||
|
// (i.e. we apply a linear delay function)
|
||||||
|
// TODO: This is pretty arbitrary
|
||||||
|
delay := time.Nanosecond * time.Duration(float32(maxDelay.Nanoseconds())*errorFraction)
|
||||||
|
// Round down to the nearest second for sanity
|
||||||
|
return time.Second * time.Duration(int(delay.Seconds()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called when we observe a throttling error
|
||||||
|
func (b *Backoff) ReportError() {
|
||||||
|
b.mutex.Lock()
|
||||||
|
defer b.mutex.Unlock()
|
||||||
|
|
||||||
|
b.countErrorsRequestLimit += 1.0
|
||||||
|
}
|
135
pkg/cloudprovider/providers/aws/retry_handler_test.go
Normal file
135
pkg/cloudprovider/providers/aws/retry_handler_test.go
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package aws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// There follows a group of tests for the backoff logic. There's nothing
|
||||||
|
// particularly special about the values chosen: if we tweak the values in the
|
||||||
|
// backoff logic then we might well have to update the tests. However the key
|
||||||
|
// behavioural elements should remain (e.g. no errors => no backoff), and these
|
||||||
|
// are each tested by one of the tests below.
|
||||||
|
|
||||||
|
// Test that we don't apply any delays when there are no errors
|
||||||
|
func TestBackoffNoErrors(t *testing.T) {
|
||||||
|
b := &Backoff{}
|
||||||
|
b.init()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
d := b.ComputeDelayForRequest(now)
|
||||||
|
if d.Nanoseconds() != 0 {
|
||||||
|
t.Fatalf("unexpected delay during no-error case")
|
||||||
|
}
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that we always apply a delay when there are errors, and also that we
|
||||||
|
// don't "flap" - that our own delay doesn't cause us to oscillate between
|
||||||
|
// delay and no-delay.
|
||||||
|
func TestBackoffAllErrors(t *testing.T) {
|
||||||
|
b := &Backoff{}
|
||||||
|
b.init()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
// Warm up
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
_ = b.ComputeDelayForRequest(now)
|
||||||
|
b.ReportError()
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
d := b.ComputeDelayForRequest(now)
|
||||||
|
b.ReportError()
|
||||||
|
if d.Seconds() < 5 {
|
||||||
|
t.Fatalf("unexpected short-delay during all-error case: %v", d)
|
||||||
|
}
|
||||||
|
t.Logf("delay @%d %v", i, d)
|
||||||
|
now = now.Add(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that we do come close to our max delay, when we see all errors at 1
|
||||||
|
// second intervals (this simulates multiple concurrent requests, because we
|
||||||
|
// don't wait for delay in between requests)
|
||||||
|
func TestBackoffHitsMax(t *testing.T) {
|
||||||
|
b := &Backoff{}
|
||||||
|
b.init()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
_ = b.ComputeDelayForRequest(now)
|
||||||
|
b.ReportError()
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
d := b.ComputeDelayForRequest(now)
|
||||||
|
b.ReportError()
|
||||||
|
if float32(d.Nanoseconds()) < (float32(maxDelay.Nanoseconds()) * 0.95) {
|
||||||
|
t.Fatalf("expected delay to be >= 95 percent of max delay, was %v", d)
|
||||||
|
}
|
||||||
|
t.Logf("delay @%d %v", i, d)
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that after a phase of errors, we eventually stop applying a delay once there are
|
||||||
|
// no more errors.
|
||||||
|
func TestBackoffRecovers(t *testing.T) {
|
||||||
|
b := &Backoff{}
|
||||||
|
b.init()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Phase of all-errors
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
_ = b.ComputeDelayForRequest(now)
|
||||||
|
b.ReportError()
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
d := b.ComputeDelayForRequest(now)
|
||||||
|
b.ReportError()
|
||||||
|
if d.Seconds() < 5 {
|
||||||
|
t.Fatalf("unexpected short-delay during all-error phase: %v", d)
|
||||||
|
}
|
||||||
|
t.Logf("error phase delay @%d %v", i, d)
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase of no errors
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
_ = b.ComputeDelayForRequest(now)
|
||||||
|
now = now.Add(3 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
d := b.ComputeDelayForRequest(now)
|
||||||
|
if d.Seconds() != 0 {
|
||||||
|
t.Fatalf("unexpected delay during error recovery phase: %v", d)
|
||||||
|
}
|
||||||
|
t.Logf("no-error phase delay @%d %v", i, d)
|
||||||
|
now = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user