From b269e8f43cfd17edb32ff3107a2b5ab1fcb4ad8e Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Wed, 6 Jan 2016 11:41:16 -0500 Subject: [PATCH 1/2] AWS: Delay all AWS calls when we observe RequestLimitExceeded errors This applies a cross-request time delay when we observe RequestLimitExceeded errors, unlike the default library behaviour which only applies a *per-request* backoff. Issue #12121 --- pkg/cloudprovider/providers/aws/aws.go | 54 +++++- .../providers/aws/retry_handler.go | 154 ++++++++++++++++++ .../providers/aws/retry_handler_test.go | 135 +++++++++++++++ 3 files changed, 338 insertions(+), 5 deletions(-) create mode 100644 pkg/cloudprovider/providers/aws/retry_handler.go create mode 100644 pkg/cloudprovider/providers/aws/retry_handler_test.go diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 811c2e107e8..683eedf84ad 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -228,13 +228,57 @@ type awsSdkEC2 struct { type awsSDKProvider struct { creds *credentials.Credentials + + mutex sync.Mutex + regionDelayers map[string]*CrossRequestRetryDelay } -func addHandlers(h *request.Handlers) { +func newAWSSDKProvider(creds *credentials.Credentials) *awsSDKProvider { + return &awsSDKProvider{ + creds: creds, + regionDelayers: make(map[string]*CrossRequestRetryDelay), + } +} + +func (p *awsSDKProvider) addHandlers(regionName string, h *request.Handlers) { h.Sign.PushFrontNamed(request.NamedHandler{ Name: "k8s/logger", Fn: awsHandlerLogger, }) + + delayer := p.getCrossRequestRetryDelay(regionName) + if delayer != nil { + h.Sign.PushFrontNamed(request.NamedHandler{ + Name: "k8s/delay-presign", + Fn: delayer.BeforeSign, + }) + + h.AfterRetry.PushFrontNamed(request.NamedHandler{ + Name: "k8s/delay-afterretry", + Fn: delayer.AfterRetry, + }) + } +} + +// Get a CrossRequestRetryDelay, scoped to the region, not to the request. +// This means that when we hit a limit on a call, we will delay _all_ calls to the API. +// We do this to protect the AWS account from becoming overloaded and effectively locked. +// We also log when we hit request limits. +// Note that this delays the current goroutine; this is bad behaviour and will +// likely cause k8s to become slow or unresponsive for cloud operations. +// However, this throttle is intended only as a last resort. When we observe +// this throttling, we need to address the root cause (e.g. add a delay to a +// controller retry loop) +func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequestRetryDelay { + p.mutex.Lock() + defer p.mutex.Unlock() + + delayer, found := p.regionDelayers[regionName] + if !found { + delayer = NewCrossRequestRetryDelay() + p.regionDelayers[regionName] = delayer + } + return delayer } func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { @@ -243,7 +287,7 @@ func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { Credentials: p.creds, })) - addHandlers(&service.Handlers) + p.addHandlers(regionName, &service.Handlers) ec2 := &awsSdkEC2{ ec2: service, @@ -257,7 +301,7 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) { Credentials: p.creds, })) - addHandlers(&elbClient.Handlers) + p.addHandlers(regionName, &elbClient.Handlers) return elbClient, nil } @@ -268,7 +312,7 @@ func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) { Credentials: p.creds, })) - addHandlers(&client.Handlers) + p.addHandlers(regionName, &client.Handlers) return client, nil } @@ -458,7 +502,7 @@ func init() { }, &credentials.SharedCredentialsProvider{}, }) - aws := &awsSDKProvider{creds: creds} + aws := newAWSSDKProvider(creds) return newAWSCloud(config, aws) }) } diff --git a/pkg/cloudprovider/providers/aws/retry_handler.go b/pkg/cloudprovider/providers/aws/retry_handler.go new file mode 100644 index 00000000000..d6d68812899 --- /dev/null +++ b/pkg/cloudprovider/providers/aws/retry_handler.go @@ -0,0 +1,154 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "math" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/golang/glog" +) + +const ( + decayIntervalSeconds = 20 + decayFraction = 0.8 + maxDelay = 60 * time.Second +) + +// CrossRequestRetryDelay inserts delays before AWS calls, when we are observing RequestLimitExceeded errors +// Note that we share a CrossRequestRetryDelay across multiple AWS requests; this is a process-wide back-off, +// whereas the aws-sdk-go implements a per-request exponential backoff/retry +type CrossRequestRetryDelay struct { + backoff Backoff +} + +// Create a new CrossRequestRetryDelay +func NewCrossRequestRetryDelay() *CrossRequestRetryDelay { + c := &CrossRequestRetryDelay{} + c.backoff.init() + return c +} + +// Added to the Sign chain; called before each request +func (c *CrossRequestRetryDelay) BeforeSign(r *request.Request) { + now := time.Now() + delay := c.backoff.ComputeDelayForRequest(now) + if delay > 0 { + glog.Warningf("Inserting delay before AWS request (%s) to avoid RequestLimitExceeded: %s", + describeRequest(r), delay.String()) + r.Config.SleepDelay(delay) + + // Avoid clock skew problems + r.Time = now + } +} + +// Return a user-friendly string describing the request, for use in log messages +func describeRequest(r *request.Request) string { + service := r.ClientInfo.ServiceName + + name := "?" + if r.Operation != nil { + name = r.Operation.Name + } + + return service + "::" + name +} + +// Added to the AfterRetry chain; called after any error +func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) { + if r.Error == nil { + return + } + awsError, ok := r.Error.(awserr.Error) + if !ok { + return + } + if awsError.Code() == "RequestLimitExceeded" { + c.backoff.ReportError() + glog.Warningf("Got RequestLimitExceeded error on AWS request (%s)", + describeRequest(r)) + } +} + +// Backoff manages a backoff that varies based on the recently observed failures +type Backoff struct { + mutex sync.Mutex + + // We count all requests & the number of requests which hit a + // RequestLimit. We only really care about 'recent' requests, so we + // decay the counts exponentially to bias towards recent values. + countErrorsRequestLimit float32 + countRequests float32 + lastDecay int64 +} + +func (b *Backoff) init() { + b.lastDecay = time.Now().Unix() + // Bias so that if the first request hits the limit we don't immediately apply the full delay + b.countRequests = 4 +} + +// Computes the delay required for a request, also updating internal state to count this request +func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration { + b.mutex.Lock() + defer b.mutex.Unlock() + + // Apply exponential decay to the counters + timeDeltaSeconds := now.Unix() - b.lastDecay + if timeDeltaSeconds > decayIntervalSeconds { + intervals := float64(timeDeltaSeconds) / float64(decayIntervalSeconds) + decay := float32(math.Pow(decayFraction, intervals)) + b.countErrorsRequestLimit *= decay + b.countRequests *= decay + b.lastDecay = now.Unix() + } + + // Count this request + b.countRequests += 1.0 + + // Compute the failure rate + errorFraction := float32(0.0) + if b.countRequests > 0.5 { + // Avoid tiny residuals & rounding errors + errorFraction = b.countErrorsRequestLimit / b.countRequests + } + + // Ignore a low fraction of errors + // This also allows them to time-out + if errorFraction < 0.1 { + return time.Duration(0) + } + + // Delay by the max delay multiplied by the recent error rate + // (i.e. we apply a linear delay function) + // TODO: This is pretty arbitrary + delay := time.Nanosecond * time.Duration(float32(maxDelay.Nanoseconds())*errorFraction) + // Round down to the nearest second for sanity + return time.Second * time.Duration(int(delay.Seconds())) +} + +// Called when we observe a throttling error +func (b *Backoff) ReportError() { + b.mutex.Lock() + defer b.mutex.Unlock() + + b.countErrorsRequestLimit += 1.0 +} diff --git a/pkg/cloudprovider/providers/aws/retry_handler_test.go b/pkg/cloudprovider/providers/aws/retry_handler_test.go new file mode 100644 index 00000000000..5a615e2637d --- /dev/null +++ b/pkg/cloudprovider/providers/aws/retry_handler_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "testing" + "time" +) + +// There follows a group of tests for the backoff logic. There's nothing +// particularly special about the values chosen: if we tweak the values in the +// backoff logic then we might well have to update the tests. However the key +// behavioural elements should remain (e.g. no errors => no backoff), and these +// are each tested by one of the tests below. + +// Test that we don't apply any delays when there are no errors +func TestBackoffNoErrors(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + for i := 0; i < 100; i++ { + d := b.ComputeDelayForRequest(now) + if d.Nanoseconds() != 0 { + t.Fatalf("unexpected delay during no-error case") + } + now = now.Add(time.Second) + } +} + +// Test that we always apply a delay when there are errors, and also that we +// don't "flap" - that our own delay doesn't cause us to oscillate between +// delay and no-delay. +func TestBackoffAllErrors(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + // Warm up + for i := 0; i < 10; i++ { + _ = b.ComputeDelayForRequest(now) + b.ReportError() + now = now.Add(time.Second) + } + + for i := 0; i < 100; i++ { + d := b.ComputeDelayForRequest(now) + b.ReportError() + if d.Seconds() < 5 { + t.Fatalf("unexpected short-delay during all-error case: %v", d) + } + t.Logf("delay @%d %v", i, d) + now = now.Add(d) + } +} + +// Test that we do come close to our max delay, when we see all errors at 1 +// second intervals (this simulates multiple concurrent requests, because we +// don't wait for delay in between requests) +func TestBackoffHitsMax(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + for i := 0; i < 100; i++ { + _ = b.ComputeDelayForRequest(now) + b.ReportError() + now = now.Add(time.Second) + } + + for i := 0; i < 10; i++ { + d := b.ComputeDelayForRequest(now) + b.ReportError() + if float32(d.Nanoseconds()) < (float32(maxDelay.Nanoseconds()) * 0.95) { + t.Fatalf("expected delay to be >= 95 percent of max delay, was %v", d) + } + t.Logf("delay @%d %v", i, d) + now = now.Add(time.Second) + } +} + +// Test that after a phase of errors, we eventually stop applying a delay once there are +// no more errors. +func TestBackoffRecovers(t *testing.T) { + b := &Backoff{} + b.init() + + now := time.Now() + + // Phase of all-errors + for i := 0; i < 100; i++ { + _ = b.ComputeDelayForRequest(now) + b.ReportError() + now = now.Add(time.Second) + } + + for i := 0; i < 10; i++ { + d := b.ComputeDelayForRequest(now) + b.ReportError() + if d.Seconds() < 5 { + t.Fatalf("unexpected short-delay during all-error phase: %v", d) + } + t.Logf("error phase delay @%d %v", i, d) + now = now.Add(time.Second) + } + + // Phase of no errors + for i := 0; i < 100; i++ { + _ = b.ComputeDelayForRequest(now) + now = now.Add(3 * time.Second) + } + + for i := 0; i < 10; i++ { + d := b.ComputeDelayForRequest(now) + if d.Seconds() != 0 { + t.Fatalf("unexpected delay during error recovery phase: %v", d) + } + t.Logf("no-error phase delay @%d %v", i, d) + now = now.Add(time.Second) + } +} From f8af47b64527bf6c87ced82a90bb942b21537136 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Sun, 21 Feb 2016 20:17:22 -0500 Subject: [PATCH 2/2] AWS: Pass globals into Backoff struct Thanks for the suggestion bprashanth! --- .../providers/aws/retry_handler.go | 19 +++++++++++++------ .../providers/aws/retry_handler_test.go | 8 ++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/cloudprovider/providers/aws/retry_handler.go b/pkg/cloudprovider/providers/aws/retry_handler.go index d6d68812899..6e6657bf0b4 100644 --- a/pkg/cloudprovider/providers/aws/retry_handler.go +++ b/pkg/cloudprovider/providers/aws/retry_handler.go @@ -42,7 +42,7 @@ type CrossRequestRetryDelay struct { // Create a new CrossRequestRetryDelay func NewCrossRequestRetryDelay() *CrossRequestRetryDelay { c := &CrossRequestRetryDelay{} - c.backoff.init() + c.backoff.init(decayIntervalSeconds, decayFraction, maxDelay) return c } @@ -90,6 +90,10 @@ func (c *CrossRequestRetryDelay) AfterRetry(r *request.Request) { // Backoff manages a backoff that varies based on the recently observed failures type Backoff struct { + decayIntervalSeconds int64 + decayFraction float64 + maxDelay time.Duration + mutex sync.Mutex // We count all requests & the number of requests which hit a @@ -100,10 +104,13 @@ type Backoff struct { lastDecay int64 } -func (b *Backoff) init() { +func (b *Backoff) init(decayIntervalSeconds int, decayFraction float64, maxDelay time.Duration) { b.lastDecay = time.Now().Unix() // Bias so that if the first request hits the limit we don't immediately apply the full delay b.countRequests = 4 + b.decayIntervalSeconds = int64(decayIntervalSeconds) + b.decayFraction = decayFraction + b.maxDelay = maxDelay } // Computes the delay required for a request, also updating internal state to count this request @@ -113,9 +120,9 @@ func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration { // Apply exponential decay to the counters timeDeltaSeconds := now.Unix() - b.lastDecay - if timeDeltaSeconds > decayIntervalSeconds { - intervals := float64(timeDeltaSeconds) / float64(decayIntervalSeconds) - decay := float32(math.Pow(decayFraction, intervals)) + if timeDeltaSeconds > b.decayIntervalSeconds { + intervals := float64(timeDeltaSeconds) / float64(b.decayIntervalSeconds) + decay := float32(math.Pow(b.decayFraction, intervals)) b.countErrorsRequestLimit *= decay b.countRequests *= decay b.lastDecay = now.Unix() @@ -140,7 +147,7 @@ func (b *Backoff) ComputeDelayForRequest(now time.Time) time.Duration { // Delay by the max delay multiplied by the recent error rate // (i.e. we apply a linear delay function) // TODO: This is pretty arbitrary - delay := time.Nanosecond * time.Duration(float32(maxDelay.Nanoseconds())*errorFraction) + delay := time.Nanosecond * time.Duration(float32(b.maxDelay.Nanoseconds())*errorFraction) // Round down to the nearest second for sanity return time.Second * time.Duration(int(delay.Seconds())) } diff --git a/pkg/cloudprovider/providers/aws/retry_handler_test.go b/pkg/cloudprovider/providers/aws/retry_handler_test.go index 5a615e2637d..e02b52d6db1 100644 --- a/pkg/cloudprovider/providers/aws/retry_handler_test.go +++ b/pkg/cloudprovider/providers/aws/retry_handler_test.go @@ -30,7 +30,7 @@ import ( // Test that we don't apply any delays when there are no errors func TestBackoffNoErrors(t *testing.T) { b := &Backoff{} - b.init() + b.init(decayIntervalSeconds, decayFraction, maxDelay) now := time.Now() for i := 0; i < 100; i++ { @@ -47,7 +47,7 @@ func TestBackoffNoErrors(t *testing.T) { // delay and no-delay. func TestBackoffAllErrors(t *testing.T) { b := &Backoff{} - b.init() + b.init(decayIntervalSeconds, decayFraction, maxDelay) now := time.Now() // Warm up @@ -73,7 +73,7 @@ func TestBackoffAllErrors(t *testing.T) { // don't wait for delay in between requests) func TestBackoffHitsMax(t *testing.T) { b := &Backoff{} - b.init() + b.init(decayIntervalSeconds, decayFraction, maxDelay) now := time.Now() for i := 0; i < 100; i++ { @@ -97,7 +97,7 @@ func TestBackoffHitsMax(t *testing.T) { // no more errors. func TestBackoffRecovers(t *testing.T) { b := &Backoff{} - b.init() + b.init(decayIntervalSeconds, decayFraction, maxDelay) now := time.Now()